3614 lines
158 KiB
Python
3614 lines
158 KiB
Python
|
|
import asyncio
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
import multiprocessing
|
|||
|
|
import subprocess
|
|||
|
|
import time
|
|||
|
|
from collections import defaultdict
|
|||
|
|
from concurrent.futures import ThreadPoolExecutor
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import List, Dict, Any, Optional
|
|||
|
|
import queue
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import psutil
|
|||
|
|
import threading
|
|||
|
|
|
|||
|
|
import av
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
import torch
|
|||
|
|
from multiprocessing import Queue, Process, Event, Manager
|
|||
|
|
|
|||
|
|
|
|||
|
|
from cv_back_video import cal_tricker_results
|
|||
|
|
from middleware.MQTTService import MQTTService, MQTTDevice, empty_osd_callback
|
|||
|
|
from middleware.entity.air_attitude import Air_Attitude
|
|||
|
|
from middleware.entity.camera_para import read_camera_params, Camera_Para
|
|||
|
|
from middleware.minio_util import downFile, upload_frame_buff_from_buffer, upload_video_buff_from_buffer
|
|||
|
|
from touying.ImageReproject_python.cal_func import red_line_reproject, cal_canv_location_by_osd
|
|||
|
|
from yolo.cv_multi_model_back_video import put_chinese_text, is_point_in_polygonlist, TrackIDEventFilter, \
|
|||
|
|
haversine, frames_to_video_bytes
|
|||
|
|
from yolo.detect.multi_yolo_trt_detect_track_trt8 import MultiYoloTrtDetectorTrackId_TRT8
|
|||
|
|
from yolo.detect.multi_yolo_trt_detect_track_trt10_yolo11 import MultiYoloTrtDetectorTrackId_TRT10_YOLO11
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 配置日志
|
|||
|
|
logging.basicConfig(
|
|||
|
|
level=logging.INFO,
|
|||
|
|
format='%(asctime)s - [PID:%(process)d] - %(name)s - %(levelname)s - %(message)s'
|
|||
|
|
)
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Config:
|
|||
|
|
MAX_WORKERS = 8
|
|||
|
|
FRAME_QUEUE_SIZE = 25
|
|||
|
|
PROCESSED_QUEUE_SIZE = 60
|
|||
|
|
Timestamped_Queue_SIZE = 500
|
|||
|
|
RETRY_COUNT = 3
|
|||
|
|
READ_RTMP_WORKERS = 2
|
|||
|
|
PROCESS_FRAME_WORKERS = 2
|
|||
|
|
UPLOAD_WORKERS = 2
|
|||
|
|
WRITE_FRAME_WORKERS = 2
|
|||
|
|
INVADE_WORKERS = 2
|
|||
|
|
EVENT_VIDEO_WORKERS = 2
|
|||
|
|
MAX_QUEUE_WARN_SIZE = 15
|
|||
|
|
MODEL_INPUT_SIZE = (640, 640)
|
|||
|
|
DEFAULT_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|||
|
|
TARGET_FPS = 25
|
|||
|
|
MAX_FRAME_DROP = 5
|
|||
|
|
PERF_LOG_INTERVAL = 5.0
|
|||
|
|
|
|||
|
|
# CPU隔离配置 - 使用独立的CPU核心
|
|||
|
|
CPU_ALLOCATIONS = {
|
|||
|
|
'process_a': { # RTMP处理进程 - 高优先级,高速推理
|
|||
|
|
'cpus': [0, 1, 2, 3], # 使用4个CPU核心
|
|||
|
|
'numa_node': 0,
|
|||
|
|
'priority': 'high', # 高优先级
|
|||
|
|
'sched_policy': 'fifo' # FIFO调度
|
|||
|
|
},
|
|||
|
|
'process_b': { # 上传进程 - 低优先级,后台处理
|
|||
|
|
'cpus': [4, 5], # 使用2个CPU核心
|
|||
|
|
'numa_node': 0,
|
|||
|
|
'priority': 'low', # 低优先级
|
|||
|
|
'sched_policy': 'other'
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def detect_video_resolution(loop, executor, video_url):
|
|||
|
|
"""探测视频流的分辨率"""
|
|||
|
|
try:
|
|||
|
|
logger.info(f"开始探测视频分辨率: {video_url}")
|
|||
|
|
|
|||
|
|
simple_cmd = [
|
|||
|
|
'ffprobe',
|
|||
|
|
'-v', 'error',
|
|||
|
|
'-select_streams', 'v:0',
|
|||
|
|
'-show_entries', 'stream=width,height',
|
|||
|
|
'-of', 'csv=p=0:s=x',
|
|||
|
|
video_url
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
def run_simple_probe():
|
|||
|
|
try:
|
|||
|
|
result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15)
|
|||
|
|
if result.returncode == 0 and result.stdout.strip():
|
|||
|
|
dimensions = result.stdout.strip().split('x')
|
|||
|
|
if len(dimensions) == 2:
|
|||
|
|
width = int(dimensions[0])
|
|||
|
|
height = int(dimensions[1])
|
|||
|
|
if width > 0 and height > 0:
|
|||
|
|
return width, height
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"简单分辨率探测失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
dimensions = await loop.run_in_executor(executor, run_simple_probe)
|
|||
|
|
if dimensions:
|
|||
|
|
width, height = dimensions
|
|||
|
|
logger.info(f"探测到视频分辨率: {width}x{height}")
|
|||
|
|
return width, height
|
|||
|
|
|
|||
|
|
logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080")
|
|||
|
|
return 1920, 1080
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"分辨率探测异常: {e}")
|
|||
|
|
logger.warning("使用默认分辨率 1920x1080")
|
|||
|
|
return 1920, 1080
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CPUIsolator:
|
|||
|
|
"""CPU隔离管理器 - 高性能版本"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.total_cpus = psutil.cpu_count(logical=False)
|
|||
|
|
self.logical_cpus = psutil.cpu_count(logical=True)
|
|||
|
|
|
|||
|
|
logger.info(f"系统CPU信息:")
|
|||
|
|
logger.info(f" - 物理核心数: {self.total_cpus}")
|
|||
|
|
logger.info(f" - 逻辑核心数: {self.logical_cpus}")
|
|||
|
|
|
|||
|
|
def isolate_process(self, pid: int, config: Dict) -> bool:
|
|||
|
|
"""高性能CPU隔离"""
|
|||
|
|
try:
|
|||
|
|
cpu_list = config.get('cpus', [])
|
|||
|
|
priority = config.get('priority', 'normal')
|
|||
|
|
sched_policy = config.get('sched_policy', 'other')
|
|||
|
|
|
|||
|
|
logger.info(f"隔离进程 {pid} 到CPU: {cpu_list}, 优先级: {priority}, 调度策略: {sched_policy}")
|
|||
|
|
|
|||
|
|
process = psutil.Process(pid)
|
|||
|
|
|
|||
|
|
# 1. 设置CPU亲和性
|
|||
|
|
process.cpu_affinity(cpu_list)
|
|||
|
|
|
|||
|
|
# 2. 设置调度策略和优先级 (Linux)
|
|||
|
|
if sys.platform == "linux":
|
|||
|
|
try:
|
|||
|
|
if sched_policy == 'fifo':
|
|||
|
|
# SCHED_FIFO: 实时调度,最高优先级
|
|||
|
|
param = os.sched_param(os.sched_get_priority_max(os.SCHED_FIFO))
|
|||
|
|
os.sched_setscheduler(pid, os.SCHED_FIFO, param)
|
|||
|
|
elif sched_policy == 'rr':
|
|||
|
|
# SCHED_RR: 轮转调度
|
|||
|
|
param = os.sched_param(os.sched_get_priority_max(os.SCHED_RR))
|
|||
|
|
os.sched_setscheduler(pid, os.SCHED_RR, param)
|
|||
|
|
elif sched_policy == 'batch':
|
|||
|
|
# SCHED_BATCH: 批处理调度
|
|||
|
|
param = os.sched_param(0)
|
|||
|
|
os.sched_setscheduler(pid, os.SCHED_BATCH, param)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"设置调度策略失败: {e}")
|
|||
|
|
|
|||
|
|
# 3. 设置nice值
|
|||
|
|
try:
|
|||
|
|
if priority == 'high':
|
|||
|
|
os.nice(-19) # 最高优先级
|
|||
|
|
elif priority == 'low':
|
|||
|
|
os.nice(19) # 最低优先级
|
|||
|
|
else:
|
|||
|
|
os.nice(0) # 正常优先级
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 验证隔离
|
|||
|
|
current_affinity = process.cpu_affinity()
|
|||
|
|
logger.info(f"进程 {pid} 隔离完成: CPU亲和性={current_affinity}")
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"进程隔离失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
def check_cpu_affinity():
|
|||
|
|
"""检查当前进程的CPU亲和性"""
|
|||
|
|
pid = os.getpid()
|
|||
|
|
try:
|
|||
|
|
process = psutil.Process(pid)
|
|||
|
|
affinity = process.cpu_affinity()
|
|||
|
|
|
|||
|
|
# 检查调度策略
|
|||
|
|
sched_policy = "unknown"
|
|||
|
|
if sys.platform == "linux":
|
|||
|
|
try:
|
|||
|
|
policy = os.sched_getscheduler(pid)
|
|||
|
|
if policy == os.SCHED_FIFO:
|
|||
|
|
sched_policy = "SCHED_FIFO"
|
|||
|
|
elif policy == os.SCHED_RR:
|
|||
|
|
sched_policy = "SCHED_RR"
|
|||
|
|
elif policy == os.SCHED_BATCH:
|
|||
|
|
sched_policy = "SCHED_BATCH"
|
|||
|
|
elif policy == os.SCHED_OTHER:
|
|||
|
|
sched_policy = "SCHED_OTHER"
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
logger.info(f"进程 {pid} CPU状态: 亲和性={affinity}, 调度策略={sched_policy}")
|
|||
|
|
|
|||
|
|
return affinity
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"检查CPU状态失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ProcessGroupManager:
|
|||
|
|
"""进程组管理器 - 高性能版本"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.manager = Manager()
|
|||
|
|
self.process_groups = {}
|
|||
|
|
self.group_queues = {}
|
|||
|
|
self.cpu_isolator = CPUIsolator()
|
|||
|
|
|
|||
|
|
def create_process_group(self, group_id: str,
|
|||
|
|
video_url: str, task_id: str, model_configs: List[Dict],
|
|||
|
|
mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str,
|
|||
|
|
mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str,
|
|||
|
|
output_rtmp_url: str,
|
|||
|
|
invade_enable: bool, invade_file: str, camera_para_url: str,
|
|||
|
|
device_height: float, repeat_dis: float, repeat_time: float):
|
|||
|
|
"""创建进程组"""
|
|||
|
|
|
|||
|
|
# 创建队列
|
|||
|
|
group_queues = {
|
|||
|
|
'cv_frame_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
|
|||
|
|
'event_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
|
|||
|
|
'invade_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
|
|||
|
|
'timestamp_frame_queue': Queue(maxsize=Config.Timestamped_Queue_SIZE),
|
|||
|
|
'stop_event': Event(),
|
|||
|
|
'error_queue': Queue(),
|
|||
|
|
'performance_counter': Manager().Value('i', 0) # 性能计数器
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 创建进程A (RTMP处理,高优先级)
|
|||
|
|
process_a = Process(
|
|||
|
|
target=rtmp_process_main,
|
|||
|
|
name=f"{group_id}_ProcessA",
|
|||
|
|
args=(group_id, video_url, task_id, model_configs,
|
|||
|
|
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
|
|||
|
|
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
|
|||
|
|
output_rtmp_url, invade_enable, invade_file,
|
|||
|
|
camera_para_url, group_queues)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
process_a.start()
|
|||
|
|
|
|||
|
|
# CPU隔离
|
|||
|
|
logger.info(f"隔离进程A (PID:{process_a.pid})")
|
|||
|
|
self.cpu_isolator.isolate_process(process_a.pid, Config.CPU_ALLOCATIONS['process_a'])
|
|||
|
|
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if not process_a.is_alive():
|
|||
|
|
logger.error("进程A启动失败")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 创建进程B (上传处理,低优先级) - 使用正确的函数名
|
|||
|
|
process_b = Process(
|
|||
|
|
target=upload_process_main, # 修改为正确的函数名
|
|||
|
|
name=f"{group_id}_ProcessB",
|
|||
|
|
args=(group_id, task_id, mqtt_pub_ip, mqtt_pub_port,
|
|||
|
|
mqtt_pub_topic, device_height, repeat_dis,
|
|||
|
|
repeat_time, group_queues)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
process_b.start()
|
|||
|
|
|
|||
|
|
# CPU隔离
|
|||
|
|
logger.info(f"隔离进程B (PID:{process_b.pid})")
|
|||
|
|
self.cpu_isolator.isolate_process(process_b.pid, Config.CPU_ALLOCATIONS['process_b'])
|
|||
|
|
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if not process_b.is_alive():
|
|||
|
|
logger.error("进程B启动失败")
|
|||
|
|
if process_a.is_alive():
|
|||
|
|
process_a.terminate()
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
process_list = [process_a, process_b]
|
|||
|
|
|
|||
|
|
# 存储信息
|
|||
|
|
self.process_groups[group_id] = {
|
|||
|
|
'process_list': process_list,
|
|||
|
|
'status': 'running',
|
|||
|
|
'created_at': datetime.now(),
|
|||
|
|
'task_id': task_id,
|
|||
|
|
'video_url': video_url
|
|||
|
|
}
|
|||
|
|
self.group_queues[group_id] = group_queues
|
|||
|
|
|
|||
|
|
logger.info(f"进程组 {group_id} 创建完成")
|
|||
|
|
|
|||
|
|
# 启动监控
|
|||
|
|
self.start_performance_monitor(group_id)
|
|||
|
|
|
|||
|
|
return group_id
|
|||
|
|
|
|||
|
|
def start_performance_monitor(self, group_id: str):
|
|||
|
|
"""启动性能监控线程"""
|
|||
|
|
|
|||
|
|
def monitor():
|
|||
|
|
last_time = time.time()
|
|||
|
|
|
|||
|
|
while group_id in self.process_groups:
|
|||
|
|
try:
|
|||
|
|
current_time = time.time()
|
|||
|
|
elapsed = current_time - last_time
|
|||
|
|
|
|||
|
|
if elapsed >= 5.0: # 每5秒监控一次
|
|||
|
|
# 检查进程状态
|
|||
|
|
group_info = self.process_groups.get(group_id)
|
|||
|
|
if not group_info:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
process_list = group_info.get('process_list', [])
|
|||
|
|
|
|||
|
|
for i, process in enumerate(process_list):
|
|||
|
|
if process and process.is_alive():
|
|||
|
|
try:
|
|||
|
|
proc = psutil.Process(process.pid)
|
|||
|
|
cpu_affinity = proc.cpu_affinity()
|
|||
|
|
cpu_percent = proc.cpu_percent(interval=0.5)
|
|||
|
|
memory_mb = proc.memory_info().rss / 1024 / 1024
|
|||
|
|
|
|||
|
|
if i == 0:
|
|||
|
|
logger.info(
|
|||
|
|
f"进程A性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB")
|
|||
|
|
else:
|
|||
|
|
logger.info(
|
|||
|
|
f"进程B性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"监控进程状态失败: {e}")
|
|||
|
|
|
|||
|
|
last_time = current_time
|
|||
|
|
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"监控线程异常: {e}")
|
|||
|
|
time.sleep(5)
|
|||
|
|
|
|||
|
|
monitor_thread = threading.Thread(
|
|||
|
|
target=monitor,
|
|||
|
|
name=f"{group_id}_monitor",
|
|||
|
|
daemon=True
|
|||
|
|
)
|
|||
|
|
monitor_thread.start()
|
|||
|
|
|
|||
|
|
def stop_process_group(self, group_id: str, timeout: int = 10):
|
|||
|
|
"""停止进程组"""
|
|||
|
|
if group_id not in self.process_groups:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
logger.info(f"停止进程组 {group_id}")
|
|||
|
|
group_info = self.process_groups[group_id]
|
|||
|
|
group_queues = self.group_queues.get(group_id, {})
|
|||
|
|
|
|||
|
|
# 发送停止信号
|
|||
|
|
stop_event = group_queues.get('stop_event')
|
|||
|
|
if stop_event:
|
|||
|
|
stop_event.set()
|
|||
|
|
|
|||
|
|
# 等待进程结束
|
|||
|
|
process_list = group_info.get('process_list')
|
|||
|
|
|
|||
|
|
for process in process_list:
|
|||
|
|
if process and process.is_alive():
|
|||
|
|
process.join(timeout=timeout)
|
|||
|
|
if process.is_alive():
|
|||
|
|
process.terminate()
|
|||
|
|
|
|||
|
|
if group_id in self.group_queues:
|
|||
|
|
del self.group_queues[group_id]
|
|||
|
|
|
|||
|
|
group_info['status'] = 'stopped'
|
|||
|
|
group_info['stopped_at'] = datetime.now()
|
|||
|
|
logger.info(f"进程组 {group_id} 已停止")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
def stop_all_groups(self, timeout: int = 10):
|
|||
|
|
"""停止所有进程组"""
|
|||
|
|
logger.info("停止所有进程组")
|
|||
|
|
for group_id in list(self.process_groups.keys()):
|
|||
|
|
self.stop_process_group(group_id, timeout)
|
|||
|
|
logger.info("所有进程组已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# # #
|
|||
|
|
async def read_rtmp_frames(
|
|||
|
|
loop,
|
|||
|
|
read_rtmp_frames_executor: ThreadPoolExecutor,
|
|||
|
|
video_url: str,
|
|||
|
|
device: Optional[MQTTDevice] = None,
|
|||
|
|
topic_camera_osd: Optional[str] = None,
|
|||
|
|
method_camera_osd: Optional[str] = None,
|
|||
|
|
topic_osd_info: Optional[str] = None,
|
|||
|
|
method_osd_info: Optional[str] = None,
|
|||
|
|
cancel_flag: Optional[asyncio.Event] = None,
|
|||
|
|
frame_queue: asyncio.Queue = None,
|
|||
|
|
timestamp_frame_queue: Queue = None
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池)
|
|||
|
|
"""
|
|||
|
|
max_retries = 20
|
|||
|
|
retry_delay = 2
|
|||
|
|
pic_count = 0
|
|||
|
|
attempt = 0
|
|||
|
|
time_start = time.time_ns() # 添加开始时间统计
|
|||
|
|
frame_count = 0 # 统计总帧数
|
|||
|
|
|
|||
|
|
if cancel_flag is None:
|
|||
|
|
cancel_flag = asyncio.Event()
|
|||
|
|
|
|||
|
|
# loop = asyncio.get_running_loop()
|
|||
|
|
|
|||
|
|
# 打印初始统计信息
|
|||
|
|
print(f"开始读取RTMP流: {video_url}")
|
|||
|
|
|
|||
|
|
while not cancel_flag.is_set() and attempt < max_retries:
|
|||
|
|
attempt += 1
|
|||
|
|
if cancel_flag.is_set():
|
|||
|
|
logger.info("收到停止信号,终止 RTMP 读取")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
container = None
|
|||
|
|
try:
|
|||
|
|
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
|||
|
|
# 1. 关键优化:将同步的 av.open 和流初始化放到线程池
|
|||
|
|
container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
|
|||
|
|
video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
|
|||
|
|
(s for s in container.streams if s.type == 'video'))
|
|||
|
|
logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
|
|||
|
|
|
|||
|
|
# 2. 提前获取一次OSD消息(验证MQTT是否正常)
|
|||
|
|
if device and topic_osd_info and method_osd_info:
|
|||
|
|
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|||
|
|
if osd_msg:
|
|||
|
|
logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
|
|||
|
|
else:
|
|||
|
|
logger.warning("初始OSD消息为空,可能MQTT尚未收到消息")
|
|||
|
|
|
|||
|
|
# 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
|
|||
|
|
async def async_frame_generator():
|
|||
|
|
"""异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环"""
|
|||
|
|
|
|||
|
|
def sync_frame_iter():
|
|||
|
|
try:
|
|||
|
|
for frame in container.decode(video=0):
|
|||
|
|
# 线程内检查取消标志(需定期检查,避免线程无法退出)
|
|||
|
|
if cancel_flag.is_set():
|
|||
|
|
logger.info("后台线程检测到取消信号,停止帧迭代")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 确保是3通道RGB
|
|||
|
|
if len(frame.planes) == 1: # 如果是灰度图
|
|||
|
|
gray = frame.to_ndarray(format='gray')
|
|||
|
|
# 转换为3通道BGR(不修改尺寸)
|
|||
|
|
bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
|
|||
|
|
yield bgr
|
|||
|
|
else:
|
|||
|
|
# 保持原始尺寸和色彩空间,只转换格式
|
|||
|
|
bgr = frame.to_ndarray(format='bgr24')
|
|||
|
|
yield bgr
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"同步帧迭代出错: {e}")
|
|||
|
|
finally:
|
|||
|
|
if container:
|
|||
|
|
container.close()
|
|||
|
|
logger.info("RTMP容器已关闭")
|
|||
|
|
|
|||
|
|
# 将同步迭代器包装为异步生成器
|
|||
|
|
gen = sync_frame_iter()
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
try:
|
|||
|
|
# 每次获取一帧都通过线程池执行,避免长时间阻塞
|
|||
|
|
frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
|
|||
|
|
if frame is None: # 迭代结束
|
|||
|
|
break
|
|||
|
|
yield frame
|
|||
|
|
except StopIteration:
|
|||
|
|
logger.info("RTMP流帧迭代结束")
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"异步获取帧出错: {e}")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 4. 异步迭代帧(不阻塞事件循环)
|
|||
|
|
async for frame in async_frame_generator():
|
|||
|
|
if cancel_flag.is_set():
|
|||
|
|
logger.info("检测到取消信号,停止读取帧")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作)
|
|||
|
|
img = frame.copy() # 确保不修改原始帧
|
|||
|
|
osd_info = None
|
|||
|
|
|
|||
|
|
# 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取
|
|||
|
|
if device and topic_osd_info and method_osd_info:
|
|||
|
|
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|||
|
|
if osd_msg and hasattr(osd_msg, 'data'):
|
|||
|
|
osd_info = Air_Attitude(
|
|||
|
|
gimbal_pitch=osd_msg.data.gimbal_pitch,
|
|||
|
|
gimbal_roll=osd_msg.data.gimbal_roll,
|
|||
|
|
gimbal_yaw=osd_msg.data.gimbal_yaw,
|
|||
|
|
height=osd_msg.data.height,
|
|||
|
|
latitude=osd_msg.data.latitude,
|
|||
|
|
longitude=osd_msg.data.longitude
|
|||
|
|
)
|
|||
|
|
# 7. 异步放入帧队列(避免队列满时阻塞)
|
|||
|
|
# if not frame_queue.full():
|
|||
|
|
if True:
|
|||
|
|
# if True:
|
|||
|
|
pic_count += 1
|
|||
|
|
frame_count += 1 # 增加总帧数统计
|
|||
|
|
time_ns = time.time_ns()
|
|||
|
|
|
|||
|
|
# 定期输出统计信息(每1000帧)
|
|||
|
|
if time_ns - time_start > 1000000000:
|
|||
|
|
print(f"readFrames {pic_count}")
|
|||
|
|
pic_count = 0
|
|||
|
|
time_start = time_ns
|
|||
|
|
|
|||
|
|
# # if img is not None and osd_info is not None:
|
|||
|
|
await frame_queue.put((img, osd_info, time_ns))
|
|||
|
|
if not timestamp_frame_queue.full():
|
|||
|
|
print(f"timestamp_frame_queue size {timestamp_frame_queue.qsize()}")
|
|||
|
|
timestamp_frame_queue.put(
|
|||
|
|
{
|
|||
|
|
"timestamp": time_ns,
|
|||
|
|
"frame": img
|
|||
|
|
})
|
|||
|
|
# else:
|
|||
|
|
# queue_size=timestamp_frame_queue.qsize()
|
|||
|
|
# clear_size=queue_size/3
|
|||
|
|
# for i in range(int(clear_size)):
|
|||
|
|
# timestamp_frame_queue.get_nowait() #如果满了就清理1/3 队列
|
|||
|
|
|
|||
|
|
logger.debug(
|
|||
|
|
f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
|
|||
|
|
else:
|
|||
|
|
logger.warning("帧队列已满,等待1ms后重试")
|
|||
|
|
await asyncio.sleep(0.001)
|
|||
|
|
|
|||
|
|
except Exception as frame_error:
|
|||
|
|
logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
except (av.AVError, IOError) as e:
|
|||
|
|
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
|||
|
|
if attempt < max_retries:
|
|||
|
|
await asyncio.sleep(retry_delay)
|
|||
|
|
else:
|
|||
|
|
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
logger.info("read_rtmp_frames 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"未知错误: {e}", exc_info=True)
|
|||
|
|
if attempt < max_retries:
|
|||
|
|
await asyncio.sleep(retry_delay)
|
|||
|
|
finally:
|
|||
|
|
# 双重保险:确保容器关闭
|
|||
|
|
if container and not container.closed:
|
|||
|
|
await loop.run_in_executor(None, container.close)
|
|||
|
|
logger.info("RTMP容器在finally中关闭")
|
|||
|
|
|
|||
|
|
# 最终统计信息
|
|||
|
|
if frame_count > 0:
|
|||
|
|
total_time = (time.time_ns() - time_start) / 1e9
|
|||
|
|
avg_fps = frame_count / total_time if total_time > 0 else 0
|
|||
|
|
print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
|
|||
|
|
else:
|
|||
|
|
print("RTMP流读取失败,未获取到任何帧")
|
|||
|
|
|
|||
|
|
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
#
|
|||
|
|
# async def read_rtmp_frames(
|
|||
|
|
# loop,
|
|||
|
|
# read_rtmp_frames_executor: ThreadPoolExecutor,
|
|||
|
|
# video_url: str,
|
|||
|
|
# device: Optional[MQTTDevice] = None,
|
|||
|
|
# topic_camera_osd: Optional[str] = None,
|
|||
|
|
# method_camera_osd: Optional[str] = None,
|
|||
|
|
# topic_osd_info: Optional[str] = None,
|
|||
|
|
# method_osd_info: Optional[str] = None,
|
|||
|
|
# cancel_flag: Optional[asyncio.Event] = None,
|
|||
|
|
# frame_queue: asyncio.Queue = None,
|
|||
|
|
# timestamp_frame_queue: TimestampedQueue = None
|
|||
|
|
# ):
|
|||
|
|
# """
|
|||
|
|
# 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧)
|
|||
|
|
# """
|
|||
|
|
# print("read_rtmp_frames")
|
|||
|
|
# max_retries = 20
|
|||
|
|
# retry_delay = 2
|
|||
|
|
# pic_count = 0
|
|||
|
|
# attempt = 0
|
|||
|
|
# time_start = time.time_ns()
|
|||
|
|
# frame_count = 0
|
|||
|
|
#
|
|||
|
|
# if cancel_flag is None:
|
|||
|
|
# cancel_flag = asyncio.Event()
|
|||
|
|
#
|
|||
|
|
# print(f"开始读取RTMP流: {video_url}")
|
|||
|
|
#
|
|||
|
|
# while not cancel_flag.is_set() and attempt < max_retries:
|
|||
|
|
# attempt += 1
|
|||
|
|
# if cancel_flag.is_set():
|
|||
|
|
# logger.info("收到停止信号,终止 RTMP 读取")
|
|||
|
|
# break
|
|||
|
|
#
|
|||
|
|
# ffmpeg_process = None
|
|||
|
|
# width, height = None, None
|
|||
|
|
# frame_size = None
|
|||
|
|
#
|
|||
|
|
# try:
|
|||
|
|
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
|||
|
|
#
|
|||
|
|
# # 1. 探测视频分辨率
|
|||
|
|
# width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url)
|
|||
|
|
#
|
|||
|
|
# if width is None or height is None:
|
|||
|
|
# logger.warning("使用默认分辨率 1920x1080")
|
|||
|
|
# width, height = 1920, 1080
|
|||
|
|
#
|
|||
|
|
# frame_size = width * height * 3
|
|||
|
|
# logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes")
|
|||
|
|
#
|
|||
|
|
# # 2. 启动 FFmpeg 进程(优化参数提高性能)
|
|||
|
|
# ffmpeg_cmd = [
|
|||
|
|
# 'ffmpeg',
|
|||
|
|
# '-i', video_url,
|
|||
|
|
# '-loglevel', 'warning',
|
|||
|
|
# '-fflags', 'nobuffer',
|
|||
|
|
# '-flags', 'low_delay',
|
|||
|
|
# '-avioflags', 'direct',
|
|||
|
|
# '-threads', '1', # 单线程,减少上下文切换
|
|||
|
|
# '-f', 'image2pipe',
|
|||
|
|
# '-pix_fmt', 'bgr24',
|
|||
|
|
# '-vcodec', 'rawvideo',
|
|||
|
|
# '-'
|
|||
|
|
# ]
|
|||
|
|
#
|
|||
|
|
# ffmpeg_process = await loop.run_in_executor(
|
|||
|
|
# read_rtmp_frames_executor,
|
|||
|
|
# lambda: subprocess.Popen(
|
|||
|
|
# ffmpeg_cmd,
|
|||
|
|
# stdout=subprocess.PIPE,
|
|||
|
|
# stderr=subprocess.PIPE,
|
|||
|
|
# bufsize=frame_size # 设置合适的缓冲区大小
|
|||
|
|
# )
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}")
|
|||
|
|
#
|
|||
|
|
# # 3. 初始化帧读取状态
|
|||
|
|
# frame_sequence = 0
|
|||
|
|
# last_timestamp = time_start
|
|||
|
|
# consecutive_corrupted_frames = 0 # 连续损坏帧计数
|
|||
|
|
# max_consecutive_corrupted = 10 # 最大连续损坏帧数
|
|||
|
|
#
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# try:
|
|||
|
|
# # 直接读取完整帧(高性能方式)
|
|||
|
|
# raw_frame = await loop.run_in_executor(
|
|||
|
|
# read_rtmp_frames_executor,
|
|||
|
|
# lambda: ffmpeg_process.stdout.read(frame_size)
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# if not raw_frame:
|
|||
|
|
# logger.warning("读取到空帧数据,流可能已结束")
|
|||
|
|
# break
|
|||
|
|
#
|
|||
|
|
# current_time_ns = time.time_ns()
|
|||
|
|
# frame_sequence += 1
|
|||
|
|
# frame_count += 1
|
|||
|
|
#
|
|||
|
|
# # 处理帧数据(无论是否完整)
|
|||
|
|
# img = None
|
|||
|
|
# is_corrupted = False
|
|||
|
|
# # print(f"读取 read_rtmp_frames 判断")
|
|||
|
|
# try:
|
|||
|
|
# if len(raw_frame) == frame_size:
|
|||
|
|
# # 完整帧处理
|
|||
|
|
# frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3))
|
|||
|
|
# img = frame.copy()
|
|||
|
|
# consecutive_corrupted_frames = 0 # 重置连续损坏计数
|
|||
|
|
# else:
|
|||
|
|
# # 损坏帧处理
|
|||
|
|
# logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}")
|
|||
|
|
# is_corrupted = True
|
|||
|
|
# consecutive_corrupted_frames += 1
|
|||
|
|
#
|
|||
|
|
# # 创建替代帧
|
|||
|
|
# if consecutive_corrupted_frames <= max_consecutive_corrupted:
|
|||
|
|
# # 尝试部分恢复
|
|||
|
|
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|||
|
|
# valid_data = min(len(raw_frame), frame_size)
|
|||
|
|
# if valid_data > 0:
|
|||
|
|
# # 尽可能填充有效数据
|
|||
|
|
# temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8)
|
|||
|
|
# img.flat[:len(temp_frame)] = temp_frame
|
|||
|
|
# else:
|
|||
|
|
# # 连续损坏过多,创建空白帧
|
|||
|
|
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|||
|
|
# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
|
|||
|
|
#
|
|||
|
|
# except Exception as frame_error:
|
|||
|
|
# logger.error(f"帧数据处理错误: {frame_error}")
|
|||
|
|
# # 创建空白帧作为后备
|
|||
|
|
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|||
|
|
# is_corrupted = True
|
|||
|
|
# consecutive_corrupted_frames += 1
|
|||
|
|
# # print(f"读取 read_rtmp_frames 判断1")
|
|||
|
|
# # 获取OSD信息
|
|||
|
|
# osd_info = None
|
|||
|
|
# if device and topic_osd_info and method_osd_info:
|
|||
|
|
# try:
|
|||
|
|
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|||
|
|
# if osd_msg and hasattr(osd_msg, 'data'):
|
|||
|
|
# osd_info = Air_Attitude(
|
|||
|
|
# gimbal_pitch=osd_msg.data.gimbal_pitch,
|
|||
|
|
# gimbal_roll=osd_msg.data.gimbal_roll,
|
|||
|
|
# gimbal_yaw=osd_msg.data.gimbal_yaw,
|
|||
|
|
# height=osd_msg.data.height,
|
|||
|
|
# latitude=osd_msg.data.latitude,
|
|||
|
|
# longitude=osd_msg.data.longitude
|
|||
|
|
# )
|
|||
|
|
# except Exception as osd_error:
|
|||
|
|
# logger.warning(f"获取OSD信息失败: {osd_error}")
|
|||
|
|
# # print(f"读取 read_rtmp_frames 判断2")
|
|||
|
|
# # 放入帧队列
|
|||
|
|
# # if img is not None and not frame_queue.full():
|
|||
|
|
# if True:
|
|||
|
|
# # 确保时间戳递增
|
|||
|
|
# if current_time_ns <= last_timestamp:
|
|||
|
|
# current_time_ns = last_timestamp + 1
|
|||
|
|
# last_timestamp = current_time_ns
|
|||
|
|
#
|
|||
|
|
# # 统计信息
|
|||
|
|
# pic_count += 1
|
|||
|
|
# if current_time_ns - time_start > 1000000000: # 1秒
|
|||
|
|
# elapsed_seconds = (current_time_ns - time_start) / 1e9
|
|||
|
|
# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
|
|||
|
|
# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
|
|||
|
|
# print(
|
|||
|
|
# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
|
|||
|
|
# pic_count = 0
|
|||
|
|
# time_start = current_time_ns
|
|||
|
|
#
|
|||
|
|
# # print(f"读取 read_rtmp_frames 实时流")
|
|||
|
|
#
|
|||
|
|
# # # 准备帧数据
|
|||
|
|
# # frame_data = {
|
|||
|
|
# # "sequence": frame_sequence,
|
|||
|
|
# # "frame": img,
|
|||
|
|
# # "osd_info": osd_info,
|
|||
|
|
# # "timestamp": current_time_ns,
|
|||
|
|
# # "is_corrupted": is_corrupted
|
|||
|
|
# # }
|
|||
|
|
# time_ns = time.time_ns()
|
|||
|
|
# # 放入队列
|
|||
|
|
# await frame_queue.put((img, osd_info, time_ns))
|
|||
|
|
# timestamp_frame_queue.append({
|
|||
|
|
# "timestamp": time_ns,
|
|||
|
|
# "frame": img
|
|||
|
|
# })
|
|||
|
|
#
|
|||
|
|
# if frame_sequence % 100 == 0: # 每100帧输出一次日志
|
|||
|
|
# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
|
|||
|
|
#
|
|||
|
|
# elif frame_queue.full():
|
|||
|
|
# logger.warning("帧队列已满,跳过此帧")
|
|||
|
|
# await asyncio.sleep(0.001) # 短暂等待
|
|||
|
|
#
|
|||
|
|
# # 检查是否需要重新探测分辨率(仅在连续损坏时)
|
|||
|
|
# if consecutive_corrupted_frames > 5:
|
|||
|
|
# logger.warning("连续帧损坏,尝试重新探测分辨率")
|
|||
|
|
# try:
|
|||
|
|
# new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor,
|
|||
|
|
# video_url)
|
|||
|
|
# if new_width and new_height and (new_width != width or new_height != height):
|
|||
|
|
# logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}")
|
|||
|
|
# width, height = new_width, new_height
|
|||
|
|
# frame_size = width * height * 3
|
|||
|
|
# consecutive_corrupted_frames = 0 # 重置计数
|
|||
|
|
# except Exception as probe_error:
|
|||
|
|
# logger.warning(f"重新探测分辨率失败: {probe_error}")
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"读取帧数据时出错: {e}", exc_info=True)
|
|||
|
|
# # 检查 FFmpeg 进程状态
|
|||
|
|
# if ffmpeg_process and ffmpeg_process.poll() is not None:
|
|||
|
|
# try:
|
|||
|
|
# stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
|
|||
|
|
# if stderr_output:
|
|||
|
|
# logger.error(f"FFmpeg 进程错误: {stderr_output}")
|
|||
|
|
# except:
|
|||
|
|
# pass
|
|||
|
|
# logger.error("FFmpeg 进程已退出")
|
|||
|
|
# break
|
|||
|
|
# # 短暂等待后继续
|
|||
|
|
# await asyncio.sleep(0.01)
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# except (subprocess.SubprocessError, IOError) as e:
|
|||
|
|
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
|||
|
|
# if attempt < max_retries:
|
|||
|
|
# await asyncio.sleep(retry_delay)
|
|||
|
|
# else:
|
|||
|
|
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
|||
|
|
# except asyncio.CancelledError:
|
|||
|
|
# logger.info("read_rtmp_frames 收到取消信号")
|
|||
|
|
# raise
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"未知错误: {e}", exc_info=True)
|
|||
|
|
# if attempt < max_retries:
|
|||
|
|
# await asyncio.sleep(retry_delay)
|
|||
|
|
# finally:
|
|||
|
|
# if ffmpeg_process:
|
|||
|
|
# try:
|
|||
|
|
# ffmpeg_process.terminate()
|
|||
|
|
# try:
|
|||
|
|
# await asyncio.wait_for(
|
|||
|
|
# loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait),
|
|||
|
|
# timeout=2.0
|
|||
|
|
# )
|
|||
|
|
# except asyncio.TimeoutError:
|
|||
|
|
# ffmpeg_process.kill()
|
|||
|
|
# await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait)
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.warning(f"关闭FFmpeg进程时出错: {e}")
|
|||
|
|
# logger.info("FFmpeg 进程已关闭")
|
|||
|
|
#
|
|||
|
|
# if frame_count > 0:
|
|||
|
|
# total_time = (time.time_ns() - time_start) / 1e9
|
|||
|
|
# avg_fps = frame_count / total_time if total_time > 0 else 0
|
|||
|
|
# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
|
|||
|
|
# else:
|
|||
|
|
# print("RTMP流读取失败,未获取到任何帧")
|
|||
|
|
#
|
|||
|
|
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
#
|
|||
|
|
# async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT8, cancel_flag: asyncio.Event,
|
|||
|
|
# frame_queue: asyncio.Queue, processed_queue: asyncio.Queue,
|
|||
|
|
# performance_counter):
|
|||
|
|
# """高速处理帧 - 保持原有速度"""
|
|||
|
|
# frame_count = 0
|
|||
|
|
# start_time = time.time()
|
|||
|
|
# last_log_time = start_time
|
|||
|
|
#
|
|||
|
|
# # 检查CPU状态
|
|||
|
|
# check_cpu_affinity()
|
|||
|
|
#
|
|||
|
|
# # 设置高性能模式
|
|||
|
|
# logger.info("[ProcessA] 进入高速推理模式")
|
|||
|
|
#
|
|||
|
|
# # 预加载图片
|
|||
|
|
# # test_image = cv2.imread(r"/mnt/mydisk1/dj/ai2/middleware/20250711-111516-531_1761271860531810831.jpg")
|
|||
|
|
# test_image = cv2.imread(r"E:\yolo-dataset\image\0000006_00611_d_0000002.jpg")
|
|||
|
|
# if test_image is None:
|
|||
|
|
# logger.error("[ProcessA] 无法加载测试图片")
|
|||
|
|
# return
|
|||
|
|
#
|
|||
|
|
# # 使用高速循环
|
|||
|
|
# batch_size = 4
|
|||
|
|
# batch_frames = []
|
|||
|
|
# batch_timestamps = []
|
|||
|
|
#
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# try:
|
|||
|
|
# # 高速处理,不添加额外延迟
|
|||
|
|
# current_time = time.time_ns()
|
|||
|
|
#
|
|||
|
|
# # 添加到批次
|
|||
|
|
# batch_frames.append(test_image.copy())
|
|||
|
|
# batch_timestamps.append(current_time)
|
|||
|
|
# frame_count += 1
|
|||
|
|
#
|
|||
|
|
# # 当批次满时进行推理
|
|||
|
|
# if len(batch_frames) >= batch_size:
|
|||
|
|
# try:
|
|||
|
|
# # 批量推理
|
|||
|
|
# time_pr_start = time.time_ns()
|
|||
|
|
#
|
|||
|
|
# # 批量处理每个帧
|
|||
|
|
# batch_results = []
|
|||
|
|
# for frame in batch_frames:
|
|||
|
|
# detections, detections_list, model_para = await detector.predict(frame)
|
|||
|
|
# batch_results.append({
|
|||
|
|
# 'detections': detections,
|
|||
|
|
# 'detections_list': detections_list,
|
|||
|
|
# 'model_para': model_para
|
|||
|
|
# })
|
|||
|
|
#
|
|||
|
|
# time_pr_end = time.time_ns()
|
|||
|
|
# avg_inference_time = (time_pr_end - time_pr_start) / len(batch_frames) / 1000000
|
|||
|
|
#
|
|||
|
|
# # 批量放入队列
|
|||
|
|
# for i, (frame, result, timestamp) in enumerate(zip(batch_frames, batch_results, batch_timestamps)):
|
|||
|
|
# processed_data = {
|
|||
|
|
# 'frame': frame,
|
|||
|
|
# 'osd_info': None,
|
|||
|
|
# 'detections': result['detections'],
|
|||
|
|
# 'detections_list': result['detections_list'],
|
|||
|
|
# 'timestamp': timestamp,
|
|||
|
|
# 'model_para': result['model_para'],
|
|||
|
|
# 'predict_state': bool(result['detections'])
|
|||
|
|
# }
|
|||
|
|
#
|
|||
|
|
# if not processed_queue.full():
|
|||
|
|
# await processed_queue.put(processed_data)
|
|||
|
|
#
|
|||
|
|
# # 性能统计
|
|||
|
|
# current_time_log = time.time()
|
|||
|
|
# if current_time_log - last_log_time >= 1.0: # 每秒输出一次
|
|||
|
|
# total_elapsed = current_time_log - start_time
|
|||
|
|
# fps = frame_count / total_elapsed
|
|||
|
|
# logger.info(
|
|||
|
|
# f"[ProcessA] 推理速度: {fps:.1f} FPS, 累计: {frame_count}帧, 平均推理: {avg_inference_time:.2f}ms")
|
|||
|
|
# if performance_counter:
|
|||
|
|
# performance_counter.value = frame_count
|
|||
|
|
# last_log_time = current_time_log
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"[ProcessA] 批量推理错误: {e}")
|
|||
|
|
#
|
|||
|
|
# # 清空批次
|
|||
|
|
# batch_frames.clear()
|
|||
|
|
# batch_timestamps.clear()
|
|||
|
|
#
|
|||
|
|
# # 微小延迟避免CPU 100%
|
|||
|
|
# await asyncio.sleep(0.0001)
|
|||
|
|
#
|
|||
|
|
# except asyncio.CancelledError:
|
|||
|
|
# break
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"[ProcessA] 处理错误: {e}")
|
|||
|
|
# await asyncio.sleep(0.01)
|
|||
|
|
#
|
|||
|
|
# logger.info(f"[ProcessA] 推理结束,总帧数: {frame_count}")
|
|||
|
|
|
|||
|
|
async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT10_YOLO11, cancel_flag: asyncio.Event,
|
|||
|
|
frame_queue: asyncio.Queue, processed_queue: asyncio.Queue):
|
|||
|
|
"""协程处理帧队列"""
|
|||
|
|
start_time = time.time()
|
|||
|
|
time_start = time.time_ns()
|
|||
|
|
pic_count = 0
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
frame_start = time.time()
|
|||
|
|
try:
|
|||
|
|
frame, osd_info, timestamp = await asyncio.wait_for(
|
|||
|
|
frame_queue.get(),
|
|||
|
|
timeout=0.5 # 延长超时,适配处理耗时
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
time_pr_start = time.time_ns()
|
|||
|
|
detections, detections_list, model_para = await detector.predict(frame)
|
|||
|
|
time_pr_end = time.time_ns()
|
|||
|
|
print(f"time_pr_starttime_pr_start {(time_pr_end - time_pr_start) / 1000000}")
|
|||
|
|
predict_state = True
|
|||
|
|
if detections:
|
|||
|
|
print("检测到任何目标")
|
|||
|
|
if not detections:
|
|||
|
|
predict_state = False
|
|||
|
|
logger.debug("未检测到任何目标")
|
|||
|
|
|
|||
|
|
# continue
|
|||
|
|
|
|||
|
|
# # # # 显示帧用于调试(可选)
|
|||
|
|
# cv2.imshow('process_frames', frame)
|
|||
|
|
# if cv2.waitKey(1) & 0xFF == ord('q'):
|
|||
|
|
# stop_event.set()
|
|||
|
|
# break
|
|||
|
|
|
|||
|
|
processed_data = {
|
|||
|
|
'frame': frame,
|
|||
|
|
'osd_info': osd_info,
|
|||
|
|
'detections': detections,
|
|||
|
|
'detections_list': detections_list,
|
|||
|
|
'timestamp': timestamp,
|
|||
|
|
'model_para': model_para,
|
|||
|
|
'predict_state': predict_state # predict状态判定,方便rtmp推流做状态判定
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if not processed_queue.full():
|
|||
|
|
time_end = time.time_ns()
|
|||
|
|
pic_count = pic_count + 1
|
|||
|
|
if time_end - time_start > 1000000000:
|
|||
|
|
print(f"processframes {pic_count}")
|
|||
|
|
pic_count = 0
|
|||
|
|
time_start = time_end
|
|||
|
|
|
|||
|
|
await processed_queue.put(processed_data)
|
|||
|
|
else:
|
|||
|
|
logger.warning("处理队列已满,丢弃帧")
|
|||
|
|
if 'frame' in locals():
|
|||
|
|
frame_time = time.time() - frame_start
|
|||
|
|
# print(f"处理帧耗时: {frame_time:.4f}s")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"处理帧时出错: {e}", exc_info=True)
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
|
|||
|
|
except asyncio.TimeoutError:
|
|||
|
|
continue
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
print("process_frames 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取帧时发生意外错误: {e}", exc_info=True)
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
# finally:
|
|||
|
|
# log_perf('process_frames', start_time)
|
|||
|
|
print("process_frames读取线程已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# async def write_results_to_rtmp_high_speed(
|
|||
|
|
# task_id: str,
|
|||
|
|
# output_url: str = None,
|
|||
|
|
# input_fps: float = None,
|
|||
|
|
# list_points: List[List[Any]] = None,
|
|||
|
|
# camera_para: Camera_Para = None,
|
|||
|
|
# invade_state: bool = False,
|
|||
|
|
# cancel_flag: asyncio.Event = None,
|
|||
|
|
# processed_queue: asyncio.Queue = None,
|
|||
|
|
# invade_queue: Queue = None,
|
|||
|
|
# cv_frame_queue: asyncio.Queue = None,
|
|||
|
|
# cv_process_queue: Optional[Queue] = None,
|
|||
|
|
# event_process_queue: Optional[Queue] = None,
|
|||
|
|
# stream_containers: Dict[str, Any] = None
|
|||
|
|
# ):
|
|||
|
|
# """高速推流处理"""
|
|||
|
|
# frame_count = 0
|
|||
|
|
# start_time = time.time()
|
|||
|
|
# last_log_time = start_time
|
|||
|
|
#
|
|||
|
|
# logger.info(f"[ProcessA] 推流任务启动 - 高速模式")
|
|||
|
|
#
|
|||
|
|
# try:
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# try:
|
|||
|
|
# # 高速获取数据
|
|||
|
|
# processed_data = await asyncio.wait_for(
|
|||
|
|
# processed_queue.get(),
|
|||
|
|
# timeout=0.01
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# frame_count += 1
|
|||
|
|
#
|
|||
|
|
# # 放入进程间队列
|
|||
|
|
# if cv_process_queue is not None and not cv_process_queue.full():
|
|||
|
|
# try:
|
|||
|
|
# # 简化数据,只放关键信息
|
|||
|
|
# simplified_data = {
|
|||
|
|
# 'frame_count': frame_count,
|
|||
|
|
# 'timestamp': processed_data.get('timestamp'),
|
|||
|
|
# 'predict_state': processed_data.get('predict_state', False)
|
|||
|
|
# }
|
|||
|
|
# cv_process_queue.put_nowait(simplified_data)
|
|||
|
|
# except queue.Full:
|
|||
|
|
# pass
|
|||
|
|
#
|
|||
|
|
# # 性能统计
|
|||
|
|
# current_time = time.time()
|
|||
|
|
# if current_time - last_log_time >= 1.0: # 每秒输出一次
|
|||
|
|
# elapsed = current_time - start_time
|
|||
|
|
# fps = frame_count / elapsed
|
|||
|
|
# logger.info(f"[ProcessA] 推流处理: {frame_count}帧, FPS: {fps:.1f}")
|
|||
|
|
# last_log_time = current_time
|
|||
|
|
#
|
|||
|
|
# except asyncio.TimeoutError:
|
|||
|
|
# # 队列为空,继续循环
|
|||
|
|
# await asyncio.sleep(0.001)
|
|||
|
|
# continue
|
|||
|
|
# except asyncio.CancelledError:
|
|||
|
|
# break
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"[ProcessA] 推流错误: {e}")
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"[ProcessA] 推流主循环异常: {e}")
|
|||
|
|
#
|
|||
|
|
# finally:
|
|||
|
|
# logger.info(f"[ProcessA] 推流结束,总帧数: {frame_count}")
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# async def write_results_to_rtmp_high_speed(task_id: str, output_url: str = None, input_fps: float = None,
|
|||
|
|
# list_points: list[list[any]] = None, camera_para: Camera_Para = None,
|
|||
|
|
# invade_state: bool = False, cancel_flag: asyncio.Event = None,
|
|||
|
|
# cv_frame_thread_queue: Queue = None,
|
|||
|
|
# processed_queue: asyncio.Queue = None,
|
|||
|
|
# invade_queue: asyncio.Queue = None,
|
|||
|
|
# cv_frame_queue: asyncio.Queue = None,
|
|||
|
|
# event_queue: Optional[Queue] = None,
|
|||
|
|
# stream_containers: Dict[str, Any] = None,
|
|||
|
|
# ):
|
|||
|
|
# # global stream_containers, count_pic
|
|||
|
|
# start_time = time.time()
|
|||
|
|
# time_start = time.time_ns()
|
|||
|
|
# pic_count = 0
|
|||
|
|
# # 修改推流参数
|
|||
|
|
# options = {
|
|||
|
|
# 'preset': 'veryfast',
|
|||
|
|
# 'tune': 'zerolatency',
|
|||
|
|
# 'crf': '23',
|
|||
|
|
# 'g': '50', # 关键帧间隔
|
|||
|
|
# 'threads': '2', # 限制编码线程
|
|||
|
|
# }
|
|||
|
|
# codec_name = 'libx264'
|
|||
|
|
#
|
|||
|
|
# max_retries = 3
|
|||
|
|
# retry_delay = 2.0
|
|||
|
|
#
|
|||
|
|
# # 初始化视频输出
|
|||
|
|
# output_video_path = None
|
|||
|
|
# video_writer = None
|
|||
|
|
# frame_width, frame_height = None, None
|
|||
|
|
# fps = input_fps or Config.TARGET_FPS
|
|||
|
|
# last_frame_time = time.time() - 1
|
|||
|
|
# frame_interval = 1.0 / fps
|
|||
|
|
# try:
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# frame_start = time.time()
|
|||
|
|
# try:
|
|||
|
|
# # 第一层帧率控制
|
|||
|
|
# # current_time = time.time()
|
|||
|
|
# #
|
|||
|
|
# # time_diff = frame_interval - (current_time - last_frame_time)
|
|||
|
|
# # if time_diff > 0:
|
|||
|
|
# # await asyncio.sleep(time_diff)
|
|||
|
|
# # last_frame_time = current_time
|
|||
|
|
#
|
|||
|
|
# processed_data = await asyncio.wait_for(
|
|||
|
|
# processed_queue.get(),
|
|||
|
|
# timeout=1
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# # 确保 processed_data 是字典
|
|||
|
|
# if not isinstance(processed_data, dict):
|
|||
|
|
# print(f"❌ 错误:processed_data 不是字典,而是 {type(processed_data)}")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# frame = processed_data['frame']
|
|||
|
|
# # 绘制检测结果
|
|||
|
|
# frame_copy = frame.copy()
|
|||
|
|
# predict_state = processed_data['predict_state']
|
|||
|
|
# osd_info = processed_data['osd_info']
|
|||
|
|
# img_height, img_width = frame.shape[:2]
|
|||
|
|
#
|
|||
|
|
# results = []
|
|||
|
|
# results_list = []
|
|||
|
|
# # 启用侵限且拿到了飞机的姿态信息,再绘制红线
|
|||
|
|
# if invade_state and osd_info:
|
|||
|
|
# gimbal_yaw = osd_info.gimbal_yaw
|
|||
|
|
# gimbal_pitch = osd_info.gimbal_pitch
|
|||
|
|
# gimbal_roll = osd_info.gimbal_roll
|
|||
|
|
# height = osd_info.height
|
|||
|
|
# # print(f"heightheightheight {height}")
|
|||
|
|
# cam_longitude = osd_info.longitude
|
|||
|
|
# cam_latitude = osd_info.latitude
|
|||
|
|
# # 当前list_points 虽然是二维数组,但是只存了一个,后续根据业务变化
|
|||
|
|
#
|
|||
|
|
# for points in list_points:
|
|||
|
|
# # 批量返回图像的像素坐标
|
|||
|
|
# point_list = []
|
|||
|
|
# results = red_line_reproject(gimbal_yaw, gimbal_pitch, gimbal_roll, height, cam_longitude,
|
|||
|
|
# cam_latitude,
|
|||
|
|
# img_width,
|
|||
|
|
# img_height, points, camera_para)
|
|||
|
|
# if results:
|
|||
|
|
# results_list.append(results) # 支持两个区域,高压侵限、营业线侵限
|
|||
|
|
# for point in results:
|
|||
|
|
# point_list.append([point["u"], point["v"]])
|
|||
|
|
# cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int64)], isClosed=True,
|
|||
|
|
# color=(0, 0, 255),
|
|||
|
|
# thickness=2)
|
|||
|
|
# # print(f"predict_statepredict_state {predict_state}")
|
|||
|
|
# # 模型输出了推理结果
|
|||
|
|
# if predict_state:
|
|||
|
|
# # 测试代码,用做测试推理结果,初始化视频写入器(如果尚未初始化)
|
|||
|
|
# if video_writer is None and output_video_path:
|
|||
|
|
# frame_width = frame.shape[1]
|
|||
|
|
# frame_height = frame.shape[0]
|
|||
|
|
# # 定义视频编码器和输出文件
|
|||
|
|
# fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'avc1' 等其他编码
|
|||
|
|
# # fps = Config.TARGET_FPS # 使用配置中的目标帧率
|
|||
|
|
# video_writer = cv2.VideoWriter(
|
|||
|
|
# output_video_path,
|
|||
|
|
# fourcc,
|
|||
|
|
# fps,
|
|||
|
|
# (frame_width, frame_height)
|
|||
|
|
# )
|
|||
|
|
# print(f"视频写入器已初始化,分辨率: {frame_width}x{frame_height}, FPS: {fps}")
|
|||
|
|
#
|
|||
|
|
# detections = processed_data['detections']
|
|||
|
|
# detections_list = processed_data['detections_list']
|
|||
|
|
# model_para = processed_data['model_para']
|
|||
|
|
#
|
|||
|
|
# class_names = model_para[0]["model_class_names"]
|
|||
|
|
# chinese_label = model_para[0]["model_chinese_labe"]
|
|||
|
|
# cls_map = model_para[0]["cls_map"]
|
|||
|
|
# para_invade_enable = model_para[0]["para_invade_enable"]
|
|||
|
|
# model_list_func_id = model_para[0]["model_list_func_id"]
|
|||
|
|
# model_func_id = model_para[0]["func_id"]
|
|||
|
|
# invade_point = []
|
|||
|
|
# message_point = []
|
|||
|
|
# target_point = [] # 存储满足条件的图像坐标,方便后续经纬度转换
|
|||
|
|
# cls_count = 0
|
|||
|
|
#
|
|||
|
|
# # # 初始化统计字典
|
|||
|
|
# class_stats = defaultdict(int)
|
|||
|
|
# reversed_dict = {value: 0 for value in cls_map.values()}
|
|||
|
|
# bg_color = (173, 216, 230) # 文本框底色使用淡蓝色
|
|||
|
|
# text_color = (0, 255, 0) # 绿色
|
|||
|
|
#
|
|||
|
|
# for det in detections:
|
|||
|
|
# x1, y1, x2, y2 = map(int, det.bbox) # 确保坐标是整数
|
|||
|
|
# cls_id = det.class_id # 假设Detection对象有class_id属性
|
|||
|
|
# class_name = det.class_name
|
|||
|
|
# confidence = det.confidence
|
|||
|
|
# track_id = det.track_id
|
|||
|
|
# new_track_id = track_id * 100 + cls_id # 类型小于100或者为负数
|
|||
|
|
# # 更新统计
|
|||
|
|
# class_stats[cls_id] += 1
|
|||
|
|
# # 如果开起侵限功能,就只显示侵限内的框
|
|||
|
|
# point_x = (x1 + x2) / 2
|
|||
|
|
# point_y = (y1 + y2) / 2
|
|||
|
|
# # print(f"class_name--{class_name}")
|
|||
|
|
# # print(f"model_class_names: {model_para[0]['model_class_names']}")
|
|||
|
|
#
|
|||
|
|
# if class_name not in model_para[0]["model_class_names"]:
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# en_name = model_para[0]["model_chinese_labe"][
|
|||
|
|
# model_para[0]["model_class_names"].index(class_name)]
|
|||
|
|
# if invade_state:
|
|||
|
|
# # 同时适配多个区域的侵限判断
|
|||
|
|
# is_invade = is_point_in_polygonlist(point_x, point_y, results_list)
|
|||
|
|
# # is_invade = is_point_in_polygon(point_x, point_y, results)
|
|||
|
|
# # print(f"is_invadeis_invadeis_invade {is_invade} {len(results)}")
|
|||
|
|
# if is_invade:
|
|||
|
|
# cls_count += 1
|
|||
|
|
# invade_point.append({
|
|||
|
|
# "u": point_x,
|
|||
|
|
# "v": point_y,
|
|||
|
|
# "class_name": class_name
|
|||
|
|
# })
|
|||
|
|
# target_point.append({
|
|||
|
|
# "u": point_x,
|
|||
|
|
# "v": point_y,
|
|||
|
|
# "cls_id": cls_id,
|
|||
|
|
# "track_id": track_id,
|
|||
|
|
# "new_track_id": new_track_id
|
|||
|
|
# }) # 对于侵限,只存储侵限目标
|
|||
|
|
# # model_list_func_id = model_para[0]["model_list_func_id"]
|
|||
|
|
# # model_func_id = model_para[0]["func_id"]
|
|||
|
|
#
|
|||
|
|
# message_point.append({
|
|||
|
|
# "confidence": float(confidence),
|
|||
|
|
# "cls_id": cls_id,
|
|||
|
|
# "type_name": en_name,
|
|||
|
|
# "track_id": track_id,
|
|||
|
|
# "box": [x1, y1, x2, y2]
|
|||
|
|
# })
|
|||
|
|
# label = f"{en_name}:{confidence:.2f}:{track_id}"
|
|||
|
|
# # 计算文本位置
|
|||
|
|
# text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[
|
|||
|
|
# 0]
|
|||
|
|
# text_width, text_height = text_size[0], text_size[1]
|
|||
|
|
# text_x = x1
|
|||
|
|
# text_y = y1 - 5
|
|||
|
|
#
|
|||
|
|
# # 如果文本超出图像顶部,则放在框内部下方
|
|||
|
|
# if text_y < 0:
|
|||
|
|
# text_y = y2 + text_height + 5
|
|||
|
|
# temp_img = frame_copy.copy()
|
|||
|
|
# frame_copy = put_chinese_text(
|
|||
|
|
# temp_img,
|
|||
|
|
# # label, # 置信度、类别、用作测试
|
|||
|
|
# "", # 注释掉汉字
|
|||
|
|
# (text_x, text_y - text_height),
|
|||
|
|
# )
|
|||
|
|
# else:
|
|||
|
|
# cls_count += 1
|
|||
|
|
# # 绘制边界框
|
|||
|
|
# cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (0, 255, 255), 2)
|
|||
|
|
# message_point.append({
|
|||
|
|
# "confidence": float(confidence),
|
|||
|
|
# "cls_id": cls_id,
|
|||
|
|
# "type_name": en_name,
|
|||
|
|
# "track_id": track_id,
|
|||
|
|
# "box": [x1, y1, x2, y2]
|
|||
|
|
# })
|
|||
|
|
# target_point.append({
|
|||
|
|
# "u": point_x,
|
|||
|
|
# "v": point_y,
|
|||
|
|
# "cls_id": cls_id,
|
|||
|
|
# "track_id": track_id,
|
|||
|
|
# "new_track_id": new_track_id
|
|||
|
|
# }) # 对于侵限,只存储侵限目标
|
|||
|
|
# # 准备标签文本
|
|||
|
|
# # label = f"{chinese_label.get(cls_id, class_name)}: {confidence:.2f}:{track_id}"
|
|||
|
|
# label = f"{confidence:.2f}:{track_id}"
|
|||
|
|
# # 计算文本位置
|
|||
|
|
# text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=8)[0]
|
|||
|
|
# text_width, text_height = text_size[0], text_size[1]
|
|||
|
|
# text_x = x1
|
|||
|
|
# text_y = y1 - 5
|
|||
|
|
# # 如果文本超出图像顶部,则放在框内部下方
|
|||
|
|
# if text_y < 0:
|
|||
|
|
# text_y = y2 + text_height + 5
|
|||
|
|
#
|
|||
|
|
# # 绘制文本背景
|
|||
|
|
# padding = 2
|
|||
|
|
# temp_img = frame_copy.copy()
|
|||
|
|
# frame_copy = put_chinese_text(
|
|||
|
|
# temp_img,
|
|||
|
|
# # label, # 置信度、类别、用作测试
|
|||
|
|
# "", # 注释掉汉字
|
|||
|
|
# (text_x, text_y - text_height),
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# if invade_state:
|
|||
|
|
# for point in message_point:
|
|||
|
|
# cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
|
|||
|
|
# (point["box"][2], point["box"][3]),
|
|||
|
|
# (0, 255, 255), 2)
|
|||
|
|
# # 画红线
|
|||
|
|
# # 在左上角显示统计结果
|
|||
|
|
# stats_text = []
|
|||
|
|
# for cls_id, count in class_stats.items():
|
|||
|
|
# cls_name = chinese_label.get(cls_id,
|
|||
|
|
# class_names[cls_id] if class_names and cls_id < len(
|
|||
|
|
# class_names) else str(
|
|||
|
|
# cls_id))
|
|||
|
|
# reversed_dict[cls_name] = count
|
|||
|
|
# for key, value in reversed_dict.items():
|
|||
|
|
# stats_text.append(f"{key}: {value}")
|
|||
|
|
#
|
|||
|
|
# if stats_text:
|
|||
|
|
# # 计算统计文本的总高度
|
|||
|
|
# text_height = cv2.getTextSize("Test", cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, thickness=1)[0][
|
|||
|
|
# 1]
|
|||
|
|
# total_height = len(stats_text) * (text_height + 18) # 5是行间距
|
|||
|
|
#
|
|||
|
|
# # 统计文本的起始位置(左上角)
|
|||
|
|
# start_x = 50 # 留出200像素宽度
|
|||
|
|
# start_y = 20 # 从顶部开始
|
|||
|
|
#
|
|||
|
|
# # # 绘制统计背景
|
|||
|
|
# # # bg_color = (0, 0, 0)
|
|||
|
|
# # frame_copy = cv2.rectangle(
|
|||
|
|
# # frame_copy,
|
|||
|
|
# # (start_x - 10, start_y - 10),
|
|||
|
|
# # (400, start_y + total_height + 20),
|
|||
|
|
# # bg_color,
|
|||
|
|
# # -1
|
|||
|
|
# # )
|
|||
|
|
#
|
|||
|
|
# # # 逐行绘制统计文本
|
|||
|
|
# # for i, text in enumerate(stats_text):
|
|||
|
|
# # y_pos = start_y + i * (text_height + 30)
|
|||
|
|
# # temp_img = frame_copy.copy()
|
|||
|
|
# # frame_copy = put_chinese_text(
|
|||
|
|
# # temp_img,
|
|||
|
|
# # text,
|
|||
|
|
# # (start_x, y_pos),
|
|||
|
|
# # color=text_color
|
|||
|
|
# # )
|
|||
|
|
#
|
|||
|
|
# new_data = {
|
|||
|
|
# 'frame_copy': frame_copy,
|
|||
|
|
# 'frame': frame,
|
|||
|
|
# "osd_info": osd_info,
|
|||
|
|
# 'detections': detections,
|
|||
|
|
# "message_point": message_point,
|
|||
|
|
# "cls_count": cls_count,
|
|||
|
|
# "target_point": target_point,
|
|||
|
|
# "model_list_func_id": model_list_func_id,
|
|||
|
|
# "model_func_id": model_func_id,
|
|||
|
|
# # 提取第一个模型的func_id 字段,因为现在已经要做整合,无法在区分各个提取第一个模型的func_id
|
|||
|
|
# 'timestamp': processed_data.get('timestamp'),
|
|||
|
|
# "detections_list": detections_list,
|
|||
|
|
# "model_para": model_para
|
|||
|
|
# # 'model_para': processed_data.get('model_para', {}) # 确保 model_para 存在
|
|||
|
|
# }
|
|||
|
|
# # 临时代码 rtmp 和侵限逻辑要改
|
|||
|
|
# if invade_state:
|
|||
|
|
# # para_list 中使能了 para_invade_enable,才做侵限判断
|
|||
|
|
# if para_invade_enable:
|
|||
|
|
# if not invade_queue.full():
|
|||
|
|
# await invade_queue.put(new_data)
|
|||
|
|
# else:
|
|||
|
|
# if not cv_frame_queue.full():
|
|||
|
|
# await cv_frame_queue.put(new_data)
|
|||
|
|
#
|
|||
|
|
# if not cv_frame_thread_queue.full():
|
|||
|
|
# cv_frame_thread_queue.put(new_data)
|
|||
|
|
#
|
|||
|
|
# time_end = time.time_ns()
|
|||
|
|
# pic_count = pic_count + 1
|
|||
|
|
# if time_end - time_start > 10000000000:
|
|||
|
|
# print(f"writeFrames {pic_count}")
|
|||
|
|
# pic_count = 0
|
|||
|
|
# time_start = time_end
|
|||
|
|
# event_queue.put({ # 用作测试录像
|
|||
|
|
# "timestamp": time.time_ns()
|
|||
|
|
# })
|
|||
|
|
#
|
|||
|
|
# #
|
|||
|
|
# # count_p = count_p + 1
|
|||
|
|
# # cv2.imwrite(f"save_pic/rtmp/test-{count_p}.jpg", frame_copy)
|
|||
|
|
# # video_writer.write(frame_copy)
|
|||
|
|
# # else:
|
|||
|
|
# # frame_copy = frame
|
|||
|
|
#
|
|||
|
|
# # 转换颜色空间
|
|||
|
|
# rgb_frame = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB)
|
|||
|
|
#
|
|||
|
|
# # # # # # 显示帧用于调试(可选)
|
|||
|
|
# # cv2.imshow(f"write_results_to_rtmp-{task_id}", frame_copy)
|
|||
|
|
# # if cv2.waitKey(1) & 0xFF == ord('q'):
|
|||
|
|
# # stop_event.set()
|
|||
|
|
# # break
|
|||
|
|
#
|
|||
|
|
# # 初始化推流容器(如果尚未初始化)
|
|||
|
|
# if output_url and output_url not in stream_containers:
|
|||
|
|
# try:
|
|||
|
|
# # container = av.open(output_url, mode='w', format='flv')
|
|||
|
|
# container = av.open(
|
|||
|
|
# output_url,
|
|||
|
|
# mode='w',
|
|||
|
|
# format='flv',
|
|||
|
|
# options={
|
|||
|
|
# # 添加RTMP连接超时(5秒)和数据超时(10秒)
|
|||
|
|
# 'rtmp_connect_timeout': '5000000', # 单位:微秒(5秒)
|
|||
|
|
# 'rtmp_timeout': '10000000', # 单位:微秒(10秒)
|
|||
|
|
# 'stimeout': '5000000' # 底层socket超时
|
|||
|
|
# }
|
|||
|
|
# )
|
|||
|
|
# stream = container.add_stream(codec_name, rate=Config.TARGET_FPS)
|
|||
|
|
# # stream.time_base = f"1/{Config.TARGET_FPS}"
|
|||
|
|
#
|
|||
|
|
# stream.width = frame.shape[1]
|
|||
|
|
#
|
|||
|
|
# stream.height = frame.shape[0]
|
|||
|
|
# stream.pix_fmt = 'yuv420p'
|
|||
|
|
# stream.options = options
|
|||
|
|
#
|
|||
|
|
# stream_containers[output_url] = {
|
|||
|
|
# 'container': container,
|
|||
|
|
# 'stream': stream,
|
|||
|
|
# 'last_frame_time': time.time(),
|
|||
|
|
# 'frame_count': 0,
|
|||
|
|
# 'retry_count': 0
|
|||
|
|
# }
|
|||
|
|
# print(f"✅ 推流初始化成功: {output_url}")
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"❌ 推流初始化失败: {e}")
|
|||
|
|
# if 'container' in locals():
|
|||
|
|
# try:
|
|||
|
|
# container.close()
|
|||
|
|
# except:
|
|||
|
|
# pass
|
|||
|
|
# await asyncio.sleep(1.0)
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 推流逻辑
|
|||
|
|
# if output_url and output_url in stream_containers:
|
|||
|
|
# try:
|
|||
|
|
# container_info = stream_containers[output_url]
|
|||
|
|
# stream = container_info['stream']
|
|||
|
|
# container = container_info['container']
|
|||
|
|
#
|
|||
|
|
# if rgb_frame.dtype == np.uint8:
|
|||
|
|
# av_frame = av.VideoFrame.from_ndarray(rgb_frame, format='rgb24')
|
|||
|
|
# packets = stream.encode(av_frame) # 这是异步方法
|
|||
|
|
# # print(f"📦 encode 生成 {len(packets)} 个 packet")
|
|||
|
|
#
|
|||
|
|
# if packets:
|
|||
|
|
# for packet in packets:
|
|||
|
|
# try:
|
|||
|
|
# container.mux(packet)
|
|||
|
|
# container_info['last_frame_time'] = time.time()
|
|||
|
|
# container_info['frame_count'] += 1
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.warning(f"推流数据包错误: {e}")
|
|||
|
|
# container_info['retry_count'] += 1
|
|||
|
|
# if container_info['retry_count'] > max_retries:
|
|||
|
|
# raise
|
|||
|
|
# else:
|
|||
|
|
# # 编码器仍在初始化,不更新 last_frame_time
|
|||
|
|
# pass
|
|||
|
|
#
|
|||
|
|
# # 每100帧打印一次状态
|
|||
|
|
# # if container_info['frame_count'] % 100 == 0:
|
|||
|
|
# # print(f"ℹ️ 已推送 {container_info['frame_count']} 帧到 {output_url}")
|
|||
|
|
# if 'frame' in locals():
|
|||
|
|
# frame_time = time.time() - frame_start
|
|||
|
|
# print(f"推流帧耗时: {frame_time:.4f}s")
|
|||
|
|
# else:
|
|||
|
|
# print(f"⚠️ 无效帧格式: {rgb_frame.dtype}")
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"❌ 推流错误: {e}")
|
|||
|
|
# # 尝试重新初始化推流
|
|||
|
|
# if output_url in stream_containers:
|
|||
|
|
# try:
|
|||
|
|
# stream_containers[output_url]['container'].close()
|
|||
|
|
# except:
|
|||
|
|
# pass
|
|||
|
|
# del stream_containers[output_url]
|
|||
|
|
# await asyncio.sleep(retry_delay)
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# except asyncio.TimeoutError:
|
|||
|
|
# # if stop_event.is_set():
|
|||
|
|
# # break
|
|||
|
|
# continue
|
|||
|
|
# except asyncio.CancelledError:
|
|||
|
|
# print("write_results_to_rtmp 收到取消信号")
|
|||
|
|
# raise
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.error(f"推流处理异常: {e}", exc_info=True)
|
|||
|
|
# await asyncio.sleep(0.1)
|
|||
|
|
#
|
|||
|
|
# finally:
|
|||
|
|
# # log_perf('write_results_to_rtmp', start_time)
|
|||
|
|
#
|
|||
|
|
# # 清理推流容器
|
|||
|
|
# for url, info in list(stream_containers.items()):
|
|||
|
|
# try:
|
|||
|
|
# if 'container' in info:
|
|||
|
|
# info['container'].close()
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.warning(f"关闭推流容器时出错: {e}")
|
|||
|
|
# stream_containers.clear()
|
|||
|
|
#
|
|||
|
|
# # 清理视频写入器
|
|||
|
|
# if video_writer is not None:
|
|||
|
|
# video_writer.release()
|
|||
|
|
#
|
|||
|
|
# print("write_results_to_rtmp读取线程已停止")
|
|||
|
|
|
|||
|
|
async def write_results_to_rtmp_high_speed(task_id: str, output_url: str = None, input_fps: float = None,
|
|||
|
|
list_points: list[list[any]] = None, camera_para: Camera_Para = None,
|
|||
|
|
invade_state: bool = False, cancel_flag: asyncio.Event = None,
|
|||
|
|
cv_frame_thread_queue: Queue = None,
|
|||
|
|
processed_queue: asyncio.Queue = None,
|
|||
|
|
invade_queue: asyncio.Queue = None,
|
|||
|
|
cv_frame_queue: asyncio.Queue = None,
|
|||
|
|
event_queue: Optional[Queue] = None,
|
|||
|
|
stream_containers: Dict[str, Any] = None,
|
|||
|
|
):
|
|||
|
|
"""优化后的FFmpeg推流实现,解决画面模糊问题"""
|
|||
|
|
start_time = time.time()
|
|||
|
|
time_start = time.time_ns()
|
|||
|
|
pic_count = 0
|
|||
|
|
total_frames_processed = 0
|
|||
|
|
total_push_time = 0.0
|
|||
|
|
total_process_time = 0.0
|
|||
|
|
|
|||
|
|
# FFmpeg推流参数
|
|||
|
|
ffmpeg_process = None
|
|||
|
|
ffmpeg_input_pipe = None
|
|||
|
|
frame_width, frame_height = None, None
|
|||
|
|
fps = input_fps or Config.TARGET_FPS
|
|||
|
|
frame_interval = 1.0 / fps
|
|||
|
|
|
|||
|
|
# 性能统计
|
|||
|
|
perf_stats = {
|
|||
|
|
'encode_time': 0.0,
|
|||
|
|
'push_time': 0.0,
|
|||
|
|
'queue_wait': 0.0,
|
|||
|
|
'frame_process': 0.0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
logger.info(f"【推流初始化】开始推流任务 {task_id}, 目标FPS: {fps}, 输出URL: {output_url}")
|
|||
|
|
|
|||
|
|
# 初始化FFmpeg推流进程
|
|||
|
|
if output_url:
|
|||
|
|
try:
|
|||
|
|
logger.info(f"【推流初始化】尝试初始化FFmpeg推流到: {output_url}")
|
|||
|
|
|
|||
|
|
# 先尝试获取一帧来确定分辨率,最多重试3次
|
|||
|
|
max_retries = 3
|
|||
|
|
frame_obtained = False
|
|||
|
|
processed_data = None
|
|||
|
|
|
|||
|
|
for retry_count in range(max_retries):
|
|||
|
|
logger.info(f"【推流初始化】尝试获取第一帧 (尝试 {retry_count + 1}/{max_retries})...")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 等待第一帧数据来确定分辨率
|
|||
|
|
processed_data = await asyncio.wait_for(
|
|||
|
|
processed_queue.get(),
|
|||
|
|
timeout=3.0 # 3秒超时
|
|||
|
|
)
|
|||
|
|
logger.info(f"【推流初始化】第{retry_count + 1}次尝试获取到数据,类型: {type(processed_data)}")
|
|||
|
|
|
|||
|
|
if isinstance(processed_data, dict) and 'frame' in processed_data:
|
|||
|
|
frame_obtained = True
|
|||
|
|
logger.info(f"✅ 【推流初始化】成功获取有效帧数据")
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"⚠️ 【推流初始化】获取到无效数据,重新尝试...")
|
|||
|
|
# 如果是无效数据,重新放回队列
|
|||
|
|
if processed_data is not None:
|
|||
|
|
await processed_queue.put(processed_data)
|
|||
|
|
|
|||
|
|
except asyncio.TimeoutError:
|
|||
|
|
logger.warning(f"⏰ 【推流初始化】第{retry_count + 1}次尝试获取帧超时")
|
|||
|
|
if cancel_flag.is_set():
|
|||
|
|
logger.info("【推流初始化】任务被取消,退出")
|
|||
|
|
return
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if not frame_obtained or processed_data is None:
|
|||
|
|
logger.error("❌ 【推流初始化】无法获取有效的帧数据,推流任务终止")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 验证帧数据
|
|||
|
|
frame = processed_data['frame']
|
|||
|
|
logger.info(f"【推流初始化】获取帧成功,形状: {frame.shape}, 数据类型: {frame.dtype}")
|
|||
|
|
|
|||
|
|
if frame is None or len(frame.shape) != 3 or frame.shape[2] != 3:
|
|||
|
|
logger.error(f"❌ 【推流初始化】帧格式错误: shape={frame.shape}, 期望(h,w,3)")
|
|||
|
|
# 重新放回队列以供其他消费者使用
|
|||
|
|
await processed_queue.put(processed_data)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
frame_height, frame_width = frame.shape[:2]
|
|||
|
|
logger.info(f"✅ 【推流初始化】检测到输入分辨率: {frame_width}x{frame_height}")
|
|||
|
|
|
|||
|
|
# 重新放回队列
|
|||
|
|
await processed_queue.put(processed_data)
|
|||
|
|
logger.debug(f"【推流初始化】将帧放回队列")
|
|||
|
|
|
|||
|
|
# 启动FFmpeg进程
|
|||
|
|
logger.info(f"【推流初始化】启动FFmpeg进程...")
|
|||
|
|
try:
|
|||
|
|
ffmpeg_process, ffmpeg_input_pipe = start_ffmpeg_stream(
|
|||
|
|
output_url,
|
|||
|
|
frame_width,
|
|||
|
|
frame_height,
|
|||
|
|
fps
|
|||
|
|
)
|
|||
|
|
logger.info(f"✅ 【推流初始化】FFmpeg进程启动成功,进程ID: {ffmpeg_process.pid}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【推流初始化】FFmpeg进程启动失败: {e}", exc_info=True)
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
# 检查进程是否正常
|
|||
|
|
if ffmpeg_process is None or ffmpeg_input_pipe is None:
|
|||
|
|
logger.error("❌ 【推流初始化】FFmpeg进程或管道为None")
|
|||
|
|
raise ValueError("FFmpeg进程或管道初始化失败")
|
|||
|
|
|
|||
|
|
if ffmpeg_process.poll() is not None:
|
|||
|
|
logger.error(f"❌ 【推流初始化】FFmpeg进程已退出,返回码: {ffmpeg_process.returncode}")
|
|||
|
|
# 尝试获取错误输出
|
|||
|
|
if ffmpeg_process.stderr:
|
|||
|
|
try:
|
|||
|
|
stderr_output = ffmpeg_process.stderr.read()
|
|||
|
|
logger.error(f"❌ 【推流初始化】FFmpeg错误输出: {stderr_output.decode('utf-8', errors='ignore')}")
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
raise ValueError("FFmpeg进程启动后立即退出")
|
|||
|
|
|
|||
|
|
stream_containers[output_url] = {
|
|||
|
|
'process': ffmpeg_process,
|
|||
|
|
'pipe': ffmpeg_input_pipe,
|
|||
|
|
'frame_count': 0,
|
|||
|
|
'last_frame_time': time.time(),
|
|||
|
|
'retry_count': 0,
|
|||
|
|
'width': frame_width,
|
|||
|
|
'height': frame_height,
|
|||
|
|
'fps': fps
|
|||
|
|
}
|
|||
|
|
logger.info(f"✅ 【推流初始化】FFmpeg推流初始化成功: {output_url}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【推流初始化】FFmpeg推流初始化失败: {e}", exc_info=True)
|
|||
|
|
# 清理已创建的资源
|
|||
|
|
await cleanup_ffmpeg(ffmpeg_process, ffmpeg_input_pipe, output_url, stream_containers)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
logger.info(f"【推流启动】开始主推流循环,目标FPS: {fps}")
|
|||
|
|
|
|||
|
|
# 添加一个初始化完成标志
|
|||
|
|
initialization_complete = True
|
|||
|
|
consecutive_timeouts = 0
|
|||
|
|
max_consecutive_timeouts = 10
|
|||
|
|
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
frame_start = time.time()
|
|||
|
|
process_start = time.perf_counter()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 获取处理后的数据
|
|||
|
|
get_start = time.perf_counter()
|
|||
|
|
try:
|
|||
|
|
# 如果推流初始化完成,但队列为空,可以适当等待
|
|||
|
|
timeout_value = 0.5 if initialization_complete else 0.1
|
|||
|
|
processed_data = await asyncio.wait_for(
|
|||
|
|
processed_queue.get(),
|
|||
|
|
timeout=timeout_value
|
|||
|
|
)
|
|||
|
|
consecutive_timeouts = 0 # 重置超时计数
|
|||
|
|
except asyncio.TimeoutError:
|
|||
|
|
consecutive_timeouts += 1
|
|||
|
|
|
|||
|
|
# 如果连续超时次数过多,检查系统状态
|
|||
|
|
if consecutive_timeouts >= max_consecutive_timeouts:
|
|||
|
|
logger.warning(f"⚠️ 【队列状态】连续{consecutive_timeouts}次从队列获取数据超时")
|
|||
|
|
|
|||
|
|
# 检查是否有其他消费者在工作
|
|||
|
|
if processed_queue.qsize() == 0:
|
|||
|
|
logger.warning("⚠️ 【队列状态】处理队列为空,检查上游处理")
|
|||
|
|
|
|||
|
|
# 重置计数器避免过多日志
|
|||
|
|
consecutive_timeouts = 0
|
|||
|
|
|
|||
|
|
# 检查FFmpeg进程状态
|
|||
|
|
if output_url and ffmpeg_process and ffmpeg_process.poll() is not None:
|
|||
|
|
logger.error(f"🔴 【进程检查】FFmpeg进程已退出,返回码: {ffmpeg_process.returncode}")
|
|||
|
|
# 尝试自动恢复
|
|||
|
|
if output_url in stream_containers:
|
|||
|
|
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
get_time = time.perf_counter() - get_start
|
|||
|
|
perf_stats['queue_wait'] += get_time
|
|||
|
|
|
|||
|
|
if not isinstance(processed_data, dict) or 'frame' not in processed_data:
|
|||
|
|
logger.warning(f"⚠️ 【帧处理】无效的帧数据: {type(processed_data)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
frame = processed_data['frame']
|
|||
|
|
|
|||
|
|
# 检查帧的有效性
|
|||
|
|
if frame is None or len(frame.shape) != 3 or frame.shape[2] != 3:
|
|||
|
|
logger.warning(f"⚠️ 【帧处理】无效的帧: shape={frame.shape if frame is not None else 'None'}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 检查分辨率是否一致
|
|||
|
|
current_height, current_width = frame.shape[:2]
|
|||
|
|
if frame_width is None or frame_height is None:
|
|||
|
|
frame_width, frame_height = current_width, current_height
|
|||
|
|
logger.info(f"📏 【分辨率设置】设置初始分辨率: {frame_width}x{frame_height}")
|
|||
|
|
|
|||
|
|
if (current_width, current_height) != (frame_width, frame_height):
|
|||
|
|
logger.warning(f"⚠️ 【分辨率警告】分辨率变化: 从 {frame_width}x{frame_height} 变为 {current_width}x{current_height}")
|
|||
|
|
frame_width, frame_height = current_width, current_height
|
|||
|
|
# 如果分辨率变化,可能需要重新初始化FFmpeg
|
|||
|
|
if output_url and output_url in stream_containers:
|
|||
|
|
container_info = stream_containers[output_url]
|
|||
|
|
if container_info['width'] != frame_width or container_info['height'] != frame_height:
|
|||
|
|
logger.info(f"🔄 【分辨率变更】检测到分辨率变化,重新初始化FFmpeg")
|
|||
|
|
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
|
|||
|
|
|
|||
|
|
# 推流逻辑
|
|||
|
|
if output_url and ffmpeg_input_pipe:
|
|||
|
|
push_start = time.perf_counter()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 检查FFmpeg进程状态
|
|||
|
|
if ffmpeg_process.poll() is not None:
|
|||
|
|
logger.error(f"🔴 【推流错误】FFmpeg进程已退出,返回码: {ffmpeg_process.returncode}")
|
|||
|
|
# 尝试自动恢复
|
|||
|
|
if output_url in stream_containers:
|
|||
|
|
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 确保帧是连续的内存布局
|
|||
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|||
|
|
frame = np.ascontiguousarray(frame)
|
|||
|
|
|
|||
|
|
# 转换为RGB格式
|
|||
|
|
convert_start = time.perf_counter()
|
|||
|
|
try:
|
|||
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"🎨 【颜色转换】BGR转RGB失败: {e}")
|
|||
|
|
continue
|
|||
|
|
convert_time = time.perf_counter() - convert_start
|
|||
|
|
perf_stats['encode_time'] += convert_time
|
|||
|
|
|
|||
|
|
# 写入到FFmpeg的stdin
|
|||
|
|
write_start = time.perf_counter()
|
|||
|
|
try:
|
|||
|
|
frame_bytes = rgb_frame.tobytes()
|
|||
|
|
|
|||
|
|
# 写入帧数据
|
|||
|
|
ffmpeg_input_pipe.write(frame_bytes)
|
|||
|
|
ffmpeg_input_pipe.flush()
|
|||
|
|
|
|||
|
|
write_time = time.perf_counter() - write_start
|
|||
|
|
perf_stats['push_time'] += write_time
|
|||
|
|
|
|||
|
|
# 更新统计
|
|||
|
|
stream_containers[output_url]['frame_count'] += 1
|
|||
|
|
stream_containers[output_url]['last_frame_time'] = time.time()
|
|||
|
|
|
|||
|
|
if stream_containers[output_url]['frame_count'] % 100 == 0:
|
|||
|
|
logger.info(f"📤 【推流统计】{output_url} 已推流 {stream_containers[output_url]['frame_count']} 帧")
|
|||
|
|
|
|||
|
|
except BrokenPipeError as e:
|
|||
|
|
logger.error(f"🔌 【写入错误】管道损坏: {e}")
|
|||
|
|
# 尝试自动恢复
|
|||
|
|
if output_url in stream_containers:
|
|||
|
|
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
|
|||
|
|
continue
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【写入错误】写入帧失败: {e}", exc_info=True)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
push_time = time.perf_counter() - push_start
|
|||
|
|
total_push_time += push_time
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"⚠️ 【推流异常】处理异常: {e}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 统计信息
|
|||
|
|
pic_count += 1
|
|||
|
|
total_frames_processed += 1
|
|||
|
|
process_time = time.perf_counter() - process_start
|
|||
|
|
total_process_time += process_time
|
|||
|
|
|
|||
|
|
# 每10秒输出统计信息
|
|||
|
|
if time.time_ns() - time_start > 10000000000: # 10秒统计一次
|
|||
|
|
elapsed_seconds = (time.time_ns() - time_start) / 1e9
|
|||
|
|
actual_fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
|
|||
|
|
avg_process_time = total_process_time / total_frames_processed if total_frames_processed > 0 else 0
|
|||
|
|
avg_push_time = total_push_time / total_frames_processed if total_frames_processed > 0 else 0
|
|||
|
|
|
|||
|
|
# 队列状态
|
|||
|
|
queue_size = processed_queue.qsize()
|
|||
|
|
logger.info(f"📊 【性能统计】任务: {task_id[:8]}... | "
|
|||
|
|
f"帧数: {total_frames_processed} | "
|
|||
|
|
f"FPS: {actual_fps:.2f} | "
|
|||
|
|
f"队列: {queue_size} | "
|
|||
|
|
f"处理: {avg_process_time*1000:.1f}ms | "
|
|||
|
|
f"推流: {avg_push_time*1000:.1f}ms")
|
|||
|
|
|
|||
|
|
# 重置统计
|
|||
|
|
pic_count = 0
|
|||
|
|
total_process_time = 0.0
|
|||
|
|
total_push_time = 0.0
|
|||
|
|
time_start = time.time_ns()
|
|||
|
|
|
|||
|
|
# 帧率控制
|
|||
|
|
frame_time = time.time() - frame_start
|
|||
|
|
sleep_time = frame_interval - frame_time
|
|||
|
|
if sleep_time > 0:
|
|||
|
|
await asyncio.sleep(sleep_time)
|
|||
|
|
else:
|
|||
|
|
if frame_time > frame_interval * 1.5: # 只记录严重的延迟
|
|||
|
|
logger.debug(f"🐢 【帧率警告】处理时间 {frame_time*1000:.1f}ms 超过帧间隔 {frame_interval*1000:.1f}ms")
|
|||
|
|
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
logger.info(f"⏹️ 【推流任务】{task_id} 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【主循环异常】推流处理异常: {e}", exc_info=True)
|
|||
|
|
await asyncio.sleep(0.1) # 避免CPU过度占用
|
|||
|
|
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
logger.info(f"⏹️ 【任务取消】推流任务 {task_id} 被取消")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【任务异常】推流任务 {task_id} 异常退出: {e}", exc_info=True)
|
|||
|
|
finally:
|
|||
|
|
# 清理资源
|
|||
|
|
logger.info(f"🧹 【清理资源】开始清理推流任务: {task_id}")
|
|||
|
|
|
|||
|
|
if output_url in stream_containers:
|
|||
|
|
logger.info(f"🧹 【清理资源】清理容器中的资源: {output_url}")
|
|||
|
|
|
|||
|
|
if ffmpeg_process or ffmpeg_input_pipe:
|
|||
|
|
await cleanup_ffmpeg(ffmpeg_process, ffmpeg_input_pipe, output_url, stream_containers)
|
|||
|
|
|
|||
|
|
elapsed_time = time.time() - start_time
|
|||
|
|
if elapsed_time > 0:
|
|||
|
|
avg_fps = total_frames_processed / elapsed_time
|
|||
|
|
status_msg = f"✅ 【任务完成】推流任务 {task_id[:8]}... "
|
|||
|
|
if total_frames_processed > 0:
|
|||
|
|
status_msg += f"成功推流 {total_frames_processed} 帧,平均FPS: {avg_fps:.2f}"
|
|||
|
|
else:
|
|||
|
|
status_msg += f"未处理任何帧"
|
|||
|
|
logger.info(status_msg)
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def attempt_ffmpeg_recovery(output_url, stream_containers, width, height, fps, max_retries=3):
|
|||
|
|
"""尝试恢复FFmpeg推流"""
|
|||
|
|
if output_url not in stream_containers:
|
|||
|
|
logger.error(f"❌ 【恢复失败】容器中没有找到: {output_url}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
container = stream_containers[output_url]
|
|||
|
|
|
|||
|
|
# 如果重试次数过多,不再尝试
|
|||
|
|
if container.get('retry_count', 0) >= max_retries:
|
|||
|
|
logger.error(f"❌ 【恢复失败】已达到最大重试次数: {max_retries}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
for retry in range(max_retries):
|
|||
|
|
try:
|
|||
|
|
logger.info(f"🔄 【重连尝试】第{retry + 1}次尝试重新连接FFmpeg...")
|
|||
|
|
|
|||
|
|
# 清理旧资源
|
|||
|
|
old_process = container.get('process')
|
|||
|
|
old_pipe = container.get('pipe')
|
|||
|
|
|
|||
|
|
if old_pipe:
|
|||
|
|
try:
|
|||
|
|
old_pipe.close()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if old_process and old_process.poll() is None:
|
|||
|
|
try:
|
|||
|
|
old_process.terminate()
|
|||
|
|
old_process.wait(timeout=1)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 重新初始化FFmpeg
|
|||
|
|
ffmpeg_process, ffmpeg_input_pipe = start_ffmpeg_stream(
|
|||
|
|
output_url,
|
|||
|
|
width,
|
|||
|
|
height,
|
|||
|
|
fps
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 更新容器
|
|||
|
|
stream_containers[output_url] = {
|
|||
|
|
'process': ffmpeg_process,
|
|||
|
|
'pipe': ffmpeg_input_pipe,
|
|||
|
|
'frame_count': 0,
|
|||
|
|
'last_frame_time': time.time(),
|
|||
|
|
'retry_count': container.get('retry_count', 0) + 1,
|
|||
|
|
'width': width,
|
|||
|
|
'height': height,
|
|||
|
|
'fps': fps
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"✅ 【重连成功】FFmpeg重新初始化成功,重试次数: {stream_containers[output_url]['retry_count']}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【重连失败】第{retry + 1}次尝试失败: {e}")
|
|||
|
|
await asyncio.sleep(1) # 等待1秒后重试
|
|||
|
|
|
|||
|
|
logger.error(f"❌ 【重连失败】所有{max_retries}次尝试都失败了")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def start_ffmpeg_stream(output_url: str, width: int, height: int, fps: int = 30):
|
|||
|
|
"""启动FFmpeg推流进程"""
|
|||
|
|
logger.info(f"🚀 【启动FFmpeg】开始启动FFmpeg: {width}x{height}@{fps}fps -> {output_url}")
|
|||
|
|
|
|||
|
|
# FFmpeg推流参数 - 针对RTMP优化
|
|||
|
|
ffmpeg_cmd = [
|
|||
|
|
'ffmpeg',
|
|||
|
|
'-re', # 按照实际帧率读取输入
|
|||
|
|
'-y', # 覆盖输出文件
|
|||
|
|
'-f', 'rawvideo', # 输入格式
|
|||
|
|
'-vcodec', 'rawvideo', # 输入编码
|
|||
|
|
'-pix_fmt', 'rgb24', # 输入像素格式
|
|||
|
|
'-s', f'{width}x{height}', # 分辨率
|
|||
|
|
'-r', str(fps), # 帧率
|
|||
|
|
'-i', '-', # 从标准输入读取
|
|||
|
|
'-c:v', 'libx264', # 视频编码
|
|||
|
|
'-preset', 'ultrafast', # 编码速度
|
|||
|
|
'-tune', 'zerolatency', # 零延迟
|
|||
|
|
'-pix_fmt', 'yuv420p', # 输出像素格式
|
|||
|
|
'-b:v', '2000k', # 视频比特率
|
|||
|
|
'-maxrate', '2500k', # 最大比特率
|
|||
|
|
'-bufsize', '1000k', # 缓冲区大小
|
|||
|
|
'-f', 'flv', # 输出格式
|
|||
|
|
'-flvflags', 'no_duration_filesize', # FLV标志
|
|||
|
|
output_url
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
logger.info(f"📋 【FFmpeg命令】执行命令: {' '.join(ffmpeg_cmd)}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 启动FFmpeg进程
|
|||
|
|
logger.info("⏳ 【进程启动】启动FFmpeg子进程...")
|
|||
|
|
process = subprocess.Popen(
|
|||
|
|
ffmpeg_cmd,
|
|||
|
|
stdin=subprocess.PIPE,
|
|||
|
|
stdout=subprocess.DEVNULL,
|
|||
|
|
stderr=subprocess.PIPE,
|
|||
|
|
bufsize=width * height * 3, # 缓冲区大小为1帧
|
|||
|
|
text=False
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 检查进程是否启动成功
|
|||
|
|
import time as time_module
|
|||
|
|
time_module.sleep(0.1) # 给进程一点时间启动
|
|||
|
|
if process.poll() is not None:
|
|||
|
|
# 进程已退出,读取错误信息
|
|||
|
|
stderr_output = process.stderr.read()
|
|||
|
|
if stderr_output:
|
|||
|
|
error_msg = stderr_output.decode('utf-8', errors='ignore')
|
|||
|
|
logger.error(f"❌ 【FFmpeg启动失败】进程立即退出,错误: {error_msg}")
|
|||
|
|
else:
|
|||
|
|
logger.error("❌ 【FFmpeg启动失败】进程立即退出,无错误输出")
|
|||
|
|
raise RuntimeError(f"FFmpeg进程启动失败,返回码: {process.returncode}")
|
|||
|
|
|
|||
|
|
logger.info(f"✅ 【FFmpeg启动】进程启动成功,PID: {process.pid}")
|
|||
|
|
|
|||
|
|
# 启动一个线程来读取stderr(避免阻塞)
|
|||
|
|
def read_stderr():
|
|||
|
|
logger.info("👂 【读取日志】开始读取FFmpeg日志...")
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
line = process.stderr.readline()
|
|||
|
|
if not line:
|
|||
|
|
break
|
|||
|
|
line_str = line.decode('utf-8', errors='ignore').strip()
|
|||
|
|
|
|||
|
|
# 根据日志级别输出
|
|||
|
|
if 'error' in line_str.lower() or 'fail' in line_str.lower():
|
|||
|
|
logger.error(f"🔴 【FFmpeg错误】{line_str}")
|
|||
|
|
elif 'warning' in line_str.lower():
|
|||
|
|
logger.warning(f"🟡 【FFmpeg警告】{line_str}")
|
|||
|
|
elif 'frame=' in line_str and 'fps=' in line_str:
|
|||
|
|
# 帧率信息,可调低日志级别
|
|||
|
|
logger.debug(f"📈 【FFmpeg状态】{line_str}")
|
|||
|
|
elif line_str: # 其他信息
|
|||
|
|
logger.debug(f"ℹ️ 【FFmpeg信息】{line_str}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【读取日志错误】读取FFmpeg日志失败: {e}")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
import threading
|
|||
|
|
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
|
|||
|
|
stderr_thread.start()
|
|||
|
|
logger.info("✅ 【日志线程】FFmpeg日志线程启动")
|
|||
|
|
|
|||
|
|
return process, process.stdin
|
|||
|
|
|
|||
|
|
except FileNotFoundError as e:
|
|||
|
|
logger.error(f"❌ 【FFmpeg未找到】请确保FFmpeg已安装并在PATH中: {e}")
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【FFmpeg启动异常】启动FFmpeg进程失败: {e}", exc_info=True)
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
async def cleanup_ffmpeg(process, pipe, output_url, stream_containers):
|
|||
|
|
"""清理FFmpeg资源"""
|
|||
|
|
logger.info(f"🧹 【清理】开始清理FFmpeg资源: {output_url}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if pipe:
|
|||
|
|
logger.debug("🧹 【清理】关闭管道...")
|
|||
|
|
try:
|
|||
|
|
pipe.close()
|
|||
|
|
logger.debug("✅ 【清理】管道关闭成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【清理】关闭管道错误: {e}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【清理】管道清理异常: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if process:
|
|||
|
|
logger.debug(f"🧹 【清理】终止FFmpeg进程 {process.pid}...")
|
|||
|
|
try:
|
|||
|
|
# 发送SIGTERM信号
|
|||
|
|
process.terminate()
|
|||
|
|
try:
|
|||
|
|
# 等待进程结束
|
|||
|
|
await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, process.wait), timeout=5)
|
|||
|
|
logger.debug(f"✅ 【清理】FFmpeg进程 {process.pid} 正常终止")
|
|||
|
|
except (asyncio.TimeoutError, subprocess.TimeoutExpired):
|
|||
|
|
# 如果超时,强制杀死
|
|||
|
|
logger.warning(f"⚠️ 【清理】FFmpeg进程 {process.pid} 未正常退出,强制杀死")
|
|||
|
|
process.kill()
|
|||
|
|
process.wait()
|
|||
|
|
logger.debug(f"✅ 【清理】FFmpeg进程 {process.pid} 强制杀死完成")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【清理】终止FFmpeg进程错误: {e}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 【清理】进程清理异常: {e}")
|
|||
|
|
|
|||
|
|
# 从容器中移除
|
|||
|
|
if output_url and output_url in stream_containers:
|
|||
|
|
logger.debug(f"🧹 【清理】从容器中移除: {output_url}")
|
|||
|
|
del stream_containers[output_url]
|
|||
|
|
logger.debug(f"✅ 【清理】容器清理完成: {output_url}")
|
|||
|
|
|
|||
|
|
logger.info(f"✅ 【清理完成】FFmpeg资源清理完成: {output_url}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def rtmp_process_async_high_speed(group_id: str, video_url: str, task_id: str,
|
|||
|
|
model_configs: List[Dict],
|
|||
|
|
mqtt_pub_ip: str, mqtt_pub_port: int,
|
|||
|
|
mqtt_pub_topic: str,
|
|||
|
|
mqtt_sub_ip: str, mqtt_sub_port: int,
|
|||
|
|
mqtt_sub_topic: str,
|
|||
|
|
output_rtmp_url: str,
|
|||
|
|
invade_enable: bool, invade_file: str,
|
|||
|
|
camera_para_url: str,
|
|||
|
|
cv_async_frame_queue: Queue, event_queue: Queue, invade_queue: Queue,
|
|||
|
|
stop_event: Event, performance_counter, timestamp_frame_queue):
|
|||
|
|
"""进程A的异步主逻辑 - 高速版本"""
|
|||
|
|
|
|||
|
|
logger.info(f"[ProcessA] 启动高速处理引擎")
|
|||
|
|
logger.info(f"[ProcessA] 目标FPS: 20+")
|
|||
|
|
|
|||
|
|
# 初始化异步队列
|
|||
|
|
cancel_flag = asyncio.Event()
|
|||
|
|
frame_queue = asyncio.Queue(maxsize=Config.FRAME_QUEUE_SIZE)
|
|||
|
|
processed_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE * 2) # 增大队列
|
|||
|
|
cv_frame_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE)
|
|||
|
|
event_async_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE)
|
|||
|
|
|
|||
|
|
stream_containers: Dict[str, Any] = {}
|
|||
|
|
|
|||
|
|
loop = asyncio.get_running_loop()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 加载侵限区域数据
|
|||
|
|
list_points = []
|
|||
|
|
camera_para = None
|
|||
|
|
if invade_file and camera_para_url:
|
|||
|
|
try:
|
|||
|
|
camera_file_path = downFile(camera_para_url)
|
|||
|
|
camera_para = read_camera_params(camera_file_path)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"[ProcessA] 加载相机参数失败: {e}")
|
|||
|
|
|
|||
|
|
# 初始化模型
|
|||
|
|
# detector = MultiYoloTrtDetectorTrackId_TRT8(model_configs)
|
|||
|
|
detector = MultiYoloTrtDetectorTrackId_TRT10_YOLO11(model_configs)
|
|||
|
|
|
|||
|
|
logger.info(f"[ProcessA] 模型加载完成")
|
|||
|
|
|
|||
|
|
# 创建任务
|
|||
|
|
tasks = []
|
|||
|
|
process_task = None
|
|||
|
|
write_task = None
|
|||
|
|
|
|||
|
|
# 初始化MQTT设备(如果需要)
|
|||
|
|
device = None
|
|||
|
|
if mqtt_sub_ip and mqtt_sub_port and mqtt_sub_topic:
|
|||
|
|
try:
|
|||
|
|
device = MQTTDevice(host=mqtt_sub_ip, port=mqtt_sub_port,
|
|||
|
|
client_id=f"{task_id}_sub")
|
|||
|
|
device.subscribe(topic=mqtt_sub_topic, callback=empty_osd_callback)
|
|||
|
|
logger.info(f"[ProcessA] MQTT订阅已初始化: {mqtt_sub_topic}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"[ProcessA] MQTT初始化失败: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 创建线程池
|
|||
|
|
read_rtmp_executor = ThreadPoolExecutor(max_workers=Config.READ_RTMP_WORKERS,
|
|||
|
|
thread_name_prefix=f"{group_id}_read_rtmp")
|
|||
|
|
# 任务1: 拉流
|
|||
|
|
read_task = asyncio.create_task(
|
|||
|
|
read_rtmp_frames(
|
|||
|
|
loop=loop,
|
|||
|
|
read_rtmp_frames_executor=read_rtmp_executor,
|
|||
|
|
video_url=video_url,
|
|||
|
|
device=device,
|
|||
|
|
topic_camera_osd=mqtt_sub_topic,
|
|||
|
|
method_camera_osd="drc_camera_osd_info_push",
|
|||
|
|
topic_osd_info=mqtt_sub_topic,
|
|||
|
|
method_osd_info="osd_info_push",
|
|||
|
|
cancel_flag=cancel_flag,
|
|||
|
|
frame_queue=frame_queue,
|
|||
|
|
timestamp_frame_queue=timestamp_frame_queue
|
|||
|
|
),
|
|||
|
|
name=f"{group_id}_read_rtmp"
|
|||
|
|
)
|
|||
|
|
tasks.append(read_task)
|
|||
|
|
|
|||
|
|
# 高速处理任务
|
|||
|
|
process_frame_executor = ThreadPoolExecutor(max_workers=Config.PROCESS_FRAME_WORKERS)
|
|||
|
|
process_task = asyncio.create_task(
|
|||
|
|
high_speed_process_frames(detector, cancel_flag, frame_queue, processed_queue),
|
|||
|
|
name=f"{group_id}_process_frames"
|
|||
|
|
)
|
|||
|
|
tasks.append(process_task)
|
|||
|
|
|
|||
|
|
# 高速推流任务
|
|||
|
|
invade_state = bool(list_points) and invade_enable
|
|||
|
|
write_frame_executor = ThreadPoolExecutor(max_workers=Config.WRITE_FRAME_WORKERS)
|
|||
|
|
write_task = asyncio.create_task(
|
|||
|
|
write_results_to_rtmp_high_speed(
|
|||
|
|
task_id,
|
|||
|
|
output_rtmp_url,
|
|||
|
|
None,
|
|||
|
|
list_points,
|
|||
|
|
camera_para,
|
|||
|
|
invade_state,
|
|||
|
|
cancel_flag,
|
|||
|
|
cv_async_frame_queue,
|
|||
|
|
processed_queue,
|
|||
|
|
invade_queue,
|
|||
|
|
cv_frame_queue,
|
|||
|
|
event_queue,
|
|||
|
|
stream_containers
|
|||
|
|
),
|
|||
|
|
name=f"{group_id}_write_rtmp"
|
|||
|
|
)
|
|||
|
|
tasks.append(write_task)
|
|||
|
|
|
|||
|
|
# 等待停止事件
|
|||
|
|
while not (stop_event.is_set() or cancel_flag.is_set()):
|
|||
|
|
await asyncio.sleep(1)
|
|||
|
|
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
logger.info(f"[ProcessA] 任务被取消")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[ProcessA] 任务异常: {e}")
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
# 停止所有任务
|
|||
|
|
cancel_flag.set()
|
|||
|
|
for task in tasks:
|
|||
|
|
if task and not task.done():
|
|||
|
|
task.cancel()
|
|||
|
|
|
|||
|
|
# 等待任务结束
|
|||
|
|
if tasks:
|
|||
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[ProcessA] 异常: {e}")
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
# 清理资源
|
|||
|
|
if 'detector' in locals():
|
|||
|
|
detector.destroy()
|
|||
|
|
logger.info(f"[ProcessA] 高速处理结束")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def rtmp_process_main(group_id: str, video_url: str, task_id: str,
|
|||
|
|
model_configs: List[Dict],
|
|||
|
|
mqtt_pub_ip: str, mqtt_pub_port: int,
|
|||
|
|
mqtt_pub_topic: str,
|
|||
|
|
mqtt_sub_ip: str, mqtt_sub_port: int,
|
|||
|
|
mqtt_sub_topic: str,
|
|||
|
|
output_rtmp_url: str,
|
|||
|
|
invade_enable: bool, invade_file: str,
|
|||
|
|
camera_para_url: str,
|
|||
|
|
group_queues: Dict):
|
|||
|
|
"""进程A的主函数 - 高速版本"""
|
|||
|
|
|
|||
|
|
pid = os.getpid()
|
|||
|
|
logger.info(f"[ProcessA] 启动 (PID:{pid}),任务ID: {task_id}")
|
|||
|
|
|
|||
|
|
# 检查CPU状态
|
|||
|
|
check_cpu_affinity()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
asyncio.run(rtmp_process_async_high_speed(
|
|||
|
|
group_id, video_url, task_id, model_configs,
|
|||
|
|
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
|
|||
|
|
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
|
|||
|
|
output_rtmp_url, invade_enable, invade_file,
|
|||
|
|
camera_para_url,
|
|||
|
|
group_queues.get('cv_frame_queue'),
|
|||
|
|
group_queues.get('event_queue'),
|
|||
|
|
group_queues.get('invade_queue'),
|
|||
|
|
group_queues.get('stop_event'),
|
|||
|
|
group_queues.get('performance_counter'),
|
|||
|
|
group_queues.get('timestamp_frame_queue')
|
|||
|
|
))
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[ProcessA] 异常: {e}")
|
|||
|
|
error_queue = group_queues.get('error_queue')
|
|||
|
|
if error_queue:
|
|||
|
|
error_queue.put({
|
|||
|
|
'group_id': group_id,
|
|||
|
|
'process': 'ProcessA',
|
|||
|
|
'error': str(e),
|
|||
|
|
'time': datetime.now()
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def cut_evnt_video_publish_test():
|
|||
|
|
print("cut_evnt_video_publish_test")
|
|||
|
|
|
|||
|
|
#
|
|||
|
|
# async def cut_evnt_video_publish(
|
|||
|
|
# task_id: str,
|
|||
|
|
# mqtt: MQTTService,
|
|||
|
|
# mqtt_topic: str,
|
|||
|
|
# cancel_flag: asyncio.Event,
|
|||
|
|
# event_queue: Queue, # 修改为 multiprocessing.Queue
|
|||
|
|
# timestamp_frame_queue: Queue # 修改为 multiprocessing.Queue
|
|||
|
|
# ):
|
|||
|
|
# """事件视频切割与发布"""
|
|||
|
|
# print("cut_evnt_video_publish 启动")
|
|||
|
|
# loop = asyncio.get_running_loop()
|
|||
|
|
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
# time_start = time.time_ns()
|
|||
|
|
#
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# try:
|
|||
|
|
# # 使用 Queue.get() 的阻塞版本(需要处理空队列情况)
|
|||
|
|
# if event_queue.empty():
|
|||
|
|
# await asyncio.sleep(0.1) # 短暂等待避免CPU占用
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 从事件队列中获取事件(使用同步方式,因为 multiprocessing.Queue 不支持 async)
|
|||
|
|
# try:
|
|||
|
|
# # 以下代码需要适配 multiprocessing.Queue 的同步获取方式
|
|||
|
|
# # 由于 multiprocessing.Queue 没有 async 支持,需要改用同步获取
|
|||
|
|
# # 这里假设 event_queue 是 multiprocessing.Queue
|
|||
|
|
# # 需要通过 loop.run_in_executor 来同步获取
|
|||
|
|
# def get_event():
|
|||
|
|
# try:
|
|||
|
|
# return event_queue.get_nowait()
|
|||
|
|
# except queue.Empty:
|
|||
|
|
# return None
|
|||
|
|
#
|
|||
|
|
# event = await loop.run_in_executor(None, get_event)
|
|||
|
|
# if event is None:
|
|||
|
|
# await asyncio.sleep(0.1)
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"获取事件失败: {e}")
|
|||
|
|
# await asyncio.sleep(0.1)
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 检查事件有效性
|
|||
|
|
# if not isinstance(event, dict) or "timestamp" not in event:
|
|||
|
|
# print(f"无效事件,跳过")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# timestamp = event.get("timestamp")
|
|||
|
|
# if timestamp is None:
|
|||
|
|
# print("⚠️ 警告:事件中缺少 timestamp")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 收集帧数据(同步获取)
|
|||
|
|
# temp_list = []
|
|||
|
|
# count1=0
|
|||
|
|
# while not timestamp_frame_queue.empty():
|
|||
|
|
# try:
|
|||
|
|
# def get_frame():
|
|||
|
|
# try:
|
|||
|
|
#
|
|||
|
|
# print(f"timestamp_frame_queue.qsize() {timestamp_frame_queue.qsize()} {count1}")
|
|||
|
|
# return timestamp_frame_queue.get_nowait()
|
|||
|
|
# except queue.Empty:
|
|||
|
|
# return None
|
|||
|
|
#
|
|||
|
|
# item = await loop.run_in_executor(None, get_frame)
|
|||
|
|
# if item is None:
|
|||
|
|
# break
|
|||
|
|
# temp_list.append(item)
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"获取帧失败: {e}")
|
|||
|
|
# break
|
|||
|
|
#
|
|||
|
|
# if len(temp_list) < 1:
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 取第一个帧的时间戳作为基准
|
|||
|
|
# timestamp = temp_list[0]["timestamp"]
|
|||
|
|
# frames = [item["frame"] for item in temp_list]
|
|||
|
|
#
|
|||
|
|
# # 生成视频
|
|||
|
|
# video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
|
|||
|
|
# if video_bytes is None:
|
|||
|
|
# print(f"⚠ ️警告:视频生成失败")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# # 上传与发布
|
|||
|
|
# async def upload_and_publish():
|
|||
|
|
# def upload_minio():
|
|||
|
|
# return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
|
|||
|
|
#
|
|||
|
|
# try:
|
|||
|
|
# minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
|
|||
|
|
# message = {
|
|||
|
|
# "task_id": task_id,
|
|||
|
|
# "time": str(datetime.now()),
|
|||
|
|
# "detection_id": timestamp,
|
|||
|
|
# "minio": {
|
|||
|
|
# "minio_path": minio_path,
|
|||
|
|
# "minio_origin_path": minio_path,
|
|||
|
|
# "file_type": file_type
|
|||
|
|
# },
|
|||
|
|
# "box_detail": [],
|
|||
|
|
# "osd_location": {},
|
|||
|
|
# "des_location": []
|
|||
|
|
# }
|
|||
|
|
# print(f"成功上传视频: {message}")
|
|||
|
|
# await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"上传失败: {e}")
|
|||
|
|
#
|
|||
|
|
# await upload_and_publish()
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"[cut循环] 处理事件出错: {e}")
|
|||
|
|
# await asyncio.sleep(0.1)
|
|||
|
|
# except asyncio.CancelledError:
|
|||
|
|
# print("cut_evnt_video_publish 收到取消信号")
|
|||
|
|
# raise
|
|||
|
|
#
|
|||
|
|
# print("cut_evnt_video_publish 线程已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def cut_evnt_video_publish(
|
|||
|
|
task_id: str,
|
|||
|
|
mqtt: MQTTService,
|
|||
|
|
mqtt_topic: str,
|
|||
|
|
cancel_flag: asyncio.Event,
|
|||
|
|
event_queue: Queue,
|
|||
|
|
timestamp_frame_queue: Queue):
|
|||
|
|
"""事件视频切割与发布"""
|
|||
|
|
print("cut_evnt_video_publish 启动")
|
|||
|
|
loop = asyncio.get_running_loop()
|
|||
|
|
upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
time_start = time.time_ns()
|
|||
|
|
|
|||
|
|
def extract_all_frames_and_clear():
|
|||
|
|
"""原子性地提取队列中所有帧到列表并清空队列"""
|
|||
|
|
frames = []
|
|||
|
|
try:
|
|||
|
|
start_time = time.time()
|
|||
|
|
frame_count = 0
|
|||
|
|
|
|||
|
|
# 先获取队列大小
|
|||
|
|
qsize = timestamp_frame_queue.qsize()
|
|||
|
|
print(f"准备从队列获取 {qsize} 帧...")
|
|||
|
|
|
|||
|
|
# 批量获取
|
|||
|
|
temp_frames = []
|
|||
|
|
while frame_count < qsize:
|
|||
|
|
try:
|
|||
|
|
item = timestamp_frame_queue.get_nowait()
|
|||
|
|
temp_frames.append(item)
|
|||
|
|
frame_count += 1
|
|||
|
|
except queue.Empty:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 控制获取速度,避免CPU占用过高
|
|||
|
|
if frame_count % 100 == 0:
|
|||
|
|
time.sleep(0.001)
|
|||
|
|
|
|||
|
|
print(f"实际获取到 {len(temp_frames)} 帧,耗时 {time.time() - start_time:.3f}秒")
|
|||
|
|
|
|||
|
|
# 清空队列(确保所有元素都被取出)
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
timestamp_frame_queue.get_nowait()
|
|||
|
|
except queue.Empty:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
return temp_frames
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"提取帧失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
try:
|
|||
|
|
if event_queue.empty():
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
def get_event():
|
|||
|
|
try:
|
|||
|
|
return event_queue.get_nowait()
|
|||
|
|
except queue.Empty:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
event = await loop.run_in_executor(None, get_event)
|
|||
|
|
if event is None:
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if not isinstance(event, dict) or "timestamp" not in event:
|
|||
|
|
print(f"无效事件,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
timestamp = event.get("timestamp")
|
|||
|
|
if timestamp is None:
|
|||
|
|
print("⚠️ 警告:事件中缺少 timestamp")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
temp_list = extract_all_frames_and_clear()
|
|||
|
|
|
|||
|
|
if len(temp_list) < 1:
|
|||
|
|
print("未获取到帧数据,跳过")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"成功获取 {len(temp_list)} 帧用于视频生成")
|
|||
|
|
|
|||
|
|
timestamp = temp_list[0]["timestamp"]
|
|||
|
|
frames = [item["frame"] for item in temp_list]
|
|||
|
|
|
|||
|
|
video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
|
|||
|
|
if video_bytes is None:
|
|||
|
|
print(f"⚠ ️警告:视频生成失败")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
async def upload_and_publish():
|
|||
|
|
def upload_minio():
|
|||
|
|
return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
|
|||
|
|
message = {
|
|||
|
|
"task_id": task_id,
|
|||
|
|
"time": str(datetime.now()),
|
|||
|
|
"detection_id": timestamp,
|
|||
|
|
"minio": {
|
|||
|
|
"minio_path": minio_path,
|
|||
|
|
"minio_origin_path": minio_path,
|
|||
|
|
"file_type": file_type
|
|||
|
|
},
|
|||
|
|
"box_detail": [],
|
|||
|
|
"osd_location": {},
|
|||
|
|
"des_location": []
|
|||
|
|
}
|
|||
|
|
print(f"成功上传视频: {message}")
|
|||
|
|
await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"上传失败: {e}")
|
|||
|
|
|
|||
|
|
await upload_and_publish()
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[cut循环] 处理事件出错: {e}")
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
print("cut_evnt_video_publish 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
print("cut_evnt_video_publish 线程已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
# async def cut_evnt_video_publish(task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event,
|
|||
|
|
# # event_queue: asyncio.Queue = None,
|
|||
|
|
# event_queue: Queue = None,
|
|||
|
|
# timestamp_frame_queue: Queue = None):
|
|||
|
|
# print("cut_evnt_video_publishcut_evnt_video_publish")
|
|||
|
|
# loop = asyncio.get_running_loop()
|
|||
|
|
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
# time_start = time.time_ns()
|
|||
|
|
# while not cancel_flag.is_set():
|
|||
|
|
# print("cut_evnt_video_publishcut_evnt_video_publish111111")
|
|||
|
|
#
|
|||
|
|
# try:
|
|||
|
|
# # 从事件队列中获取事件(带超时,避免阻塞)
|
|||
|
|
# try:
|
|||
|
|
# event = await asyncio.wait_for(event_queue.get(), timeout=1.0)
|
|||
|
|
# except asyncio.TimeoutError:
|
|||
|
|
# continue # 队列为空,继续下一轮循环
|
|||
|
|
# #
|
|||
|
|
# # if not isinstance(event, dict) or "timestamp" not in event:
|
|||
|
|
# # print(f"[cut循环] 无效事件,跳过")
|
|||
|
|
# # continue
|
|||
|
|
# #
|
|||
|
|
# timestamp = event.get("timestamp")
|
|||
|
|
# if timestamp is None:
|
|||
|
|
# print("⚠️ 警告:事件中缺少 timestamp")
|
|||
|
|
# continue
|
|||
|
|
# # 以下是测试代码,用作录像测试
|
|||
|
|
# # 基于frame_len 控制生成视频长度
|
|||
|
|
# # matched_items = timestamp_frame_queue.query_by_timestamp(timestamp, 150)
|
|||
|
|
# # print(f"[cut循环] 匹配到 {len(matched_items)} 个帧")
|
|||
|
|
# # if len(matched_items) == 0:
|
|||
|
|
# # continue
|
|||
|
|
#
|
|||
|
|
# time_end = time.time_ns()
|
|||
|
|
# # time.sleep(3)
|
|||
|
|
# print(f"cut_evnt_video_publish {timestamp_frame_queue.qsize()}")
|
|||
|
|
# # if time_end - time_start < 5000000000:
|
|||
|
|
# # continue
|
|||
|
|
# time_start = time_end
|
|||
|
|
# temp_list = []
|
|||
|
|
# while not timestamp_frame_queue.empty():
|
|||
|
|
# try:
|
|||
|
|
# item = timestamp_frame_queue.get_nowait() # 非阻塞获取
|
|||
|
|
# temp_list.append(item)
|
|||
|
|
# except: # 如果队列为空或出错,跳过
|
|||
|
|
# break
|
|||
|
|
# if len(temp_list) <1:
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# timestamp=temp_list[0]["timestamp"]
|
|||
|
|
#
|
|||
|
|
# frames = [item["frame"] for item in temp_list]
|
|||
|
|
# if not frames: # 显式检查帧列表是否为空
|
|||
|
|
# print("⚠️ 警告:匹配到的帧列表为空")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
|
|||
|
|
# if video_bytes is None: # 检查返回值
|
|||
|
|
# print(f"⚠ ️警告:视频生成失败,返回None {len(frames)}")
|
|||
|
|
# continue
|
|||
|
|
#
|
|||
|
|
# async def upload_and_publish():
|
|||
|
|
# def upload_minio():
|
|||
|
|
# return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
|
|||
|
|
#
|
|||
|
|
# try:
|
|||
|
|
# minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
|
|||
|
|
# # message = {"detection_id": timestamp, "minio_path": minio_path}
|
|||
|
|
# message = {
|
|||
|
|
# "task_id": task_id,
|
|||
|
|
# "time": str(datetime.now()),
|
|||
|
|
# "detection_id": timestamp,
|
|||
|
|
# "minio": {"minio_path": minio_path,
|
|||
|
|
# "minio_origin_path": minio_path,
|
|||
|
|
# "file_type": file_type},
|
|||
|
|
# "box_detail": [{
|
|||
|
|
#
|
|||
|
|
# }],
|
|||
|
|
# "osd_location": {
|
|||
|
|
# },
|
|||
|
|
# "des_location": []
|
|||
|
|
# }
|
|||
|
|
# print(f"成功上传视频: {message}")
|
|||
|
|
# await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"上传失败: {e}")
|
|||
|
|
#
|
|||
|
|
# asyncio.create_task(upload_and_publish())
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# print(f"[cut循环] 处理事件出错2: {e}")
|
|||
|
|
# finally:
|
|||
|
|
# # if event is not None:
|
|||
|
|
# # event_queue.task_done()
|
|||
|
|
# # print(f"[cut循环] 确认消费,剩余未完成任务: {event_queue._unfinished_tasks}")
|
|||
|
|
#
|
|||
|
|
# if event_queue.qsize() > 0 and event_queue._unfinished_tasks == 0:
|
|||
|
|
# try:
|
|||
|
|
# event_queue.put_nowait(None)
|
|||
|
|
# dummy = await event_queue.get()
|
|||
|
|
# event_queue.task_done()
|
|||
|
|
# except:
|
|||
|
|
# pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_local_drc_message():
|
|||
|
|
global local_drc_message
|
|||
|
|
if local_drc_message is not None:
|
|||
|
|
mess = local_drc_message
|
|||
|
|
local_drc_message = None
|
|||
|
|
return mess
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def send_frame_to_s3_mq(loop, upload_executor, task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event,
|
|||
|
|
cv_frame_queue: asyncio.Queue,
|
|||
|
|
event_queue: asyncio.Queue = None,
|
|||
|
|
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1):
|
|||
|
|
global stats
|
|||
|
|
start_time = time.time()
|
|||
|
|
print("send_frame_to_s3_mq")
|
|||
|
|
# executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
|
|||
|
|
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
track_filter = TrackIDEventFilter(max_inactive_time=10)
|
|||
|
|
reported_track_ids = defaultdict(float)
|
|||
|
|
count_pic = 0
|
|||
|
|
report_interval = 8
|
|||
|
|
# loop = asyncio.get_running_loop()
|
|||
|
|
local_func_cache = {
|
|||
|
|
"func_100000": None,
|
|||
|
|
"func_100004": None, # 存储缓存,缓存人员track_id
|
|||
|
|
"func_100006": None # 存储缓存,缓存车辆track_id
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
para = {
|
|||
|
|
"category": 3
|
|||
|
|
}
|
|||
|
|
target_location_back = [] # 本地缓存,用作位置重复计算
|
|||
|
|
current_time_second = int(time.time())
|
|||
|
|
repeat_time_count = 0
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
upload_start = time.time()
|
|||
|
|
try:
|
|||
|
|
# 检查队列长度,避免堆积
|
|||
|
|
if cv_frame_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2:
|
|||
|
|
print(f"警告:cv_frame_queue 积压(当前长度={cv_frame_queue.qsize()}),清空队列")
|
|||
|
|
while not cv_frame_queue.empty():
|
|||
|
|
await cv_frame_queue.get()
|
|||
|
|
print("队列已清空,等待新数据...")
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 获取队列数据
|
|||
|
|
try:
|
|||
|
|
cv_frame = await asyncio.wait_for(cv_frame_queue.get(), timeout=0.05)
|
|||
|
|
# 检查数据类型
|
|||
|
|
if not isinstance(cv_frame, dict):
|
|||
|
|
print(f"⚠️ 警告:cv_frame 不是字典,而是 {type(cv_frame)}")
|
|||
|
|
continue
|
|||
|
|
except asyncio.TimeoutError:
|
|||
|
|
continue
|
|||
|
|
if repeat_time > 0: # 基于时间的重复计算,是否使能
|
|||
|
|
read_cv_time_second = int(time.time())
|
|||
|
|
# print(f"read_cv_time_secondread_cv_time_second {read_cv_time_second}")
|
|||
|
|
if read_cv_time_second - current_time_second < repeat_time and repeat_time_count > 0:
|
|||
|
|
# print("触发事件去重")
|
|||
|
|
continue
|
|||
|
|
else:
|
|||
|
|
print("没有触发事件去重")
|
|||
|
|
repeat_time_count += 1 # 防止丢失第一个目标
|
|||
|
|
current_time_second = read_cv_time_second
|
|||
|
|
# 准备数据
|
|||
|
|
frame_copy = cv_frame['frame_copy']
|
|||
|
|
frame = cv_frame['frame']
|
|||
|
|
target_point = cv_frame['target_point']
|
|||
|
|
detections = cv_frame['detections']
|
|||
|
|
detections_list = cv_frame['detections_list']
|
|||
|
|
model_para_list = cv_frame.get('model_para', {}) # 默认空字典
|
|||
|
|
timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典
|
|||
|
|
air_alti = cv_frame['osd_info']
|
|||
|
|
|
|||
|
|
img_height, img_width = frame.shape[:2]
|
|||
|
|
gimbal_yaw = air_alti.gimbal_yaw
|
|||
|
|
gimbal_pitch = air_alti.gimbal_pitch
|
|||
|
|
gimbal_roll = air_alti.gimbal_roll
|
|||
|
|
height = air_alti.height
|
|||
|
|
cam_longitude = air_alti.longitude
|
|||
|
|
cam_latitude = air_alti.latitude
|
|||
|
|
# 初始化默认值
|
|||
|
|
frame11 = frame_copy # 默认使用原始帧
|
|||
|
|
box_detail = [] # 默认空列表
|
|||
|
|
|
|||
|
|
current_time = time.time()
|
|||
|
|
h = device_height
|
|||
|
|
us = []
|
|||
|
|
vs = []
|
|||
|
|
heights = []
|
|||
|
|
should_report = True
|
|||
|
|
print(f"target_pointtarget_point {len(target_point)}")
|
|||
|
|
count_item = 0
|
|||
|
|
|
|||
|
|
des_location_result = []
|
|||
|
|
for item in target_point:
|
|||
|
|
# # 跳过无效的track_id
|
|||
|
|
# 检查是否应该上报该track_id
|
|||
|
|
u = item["u"]
|
|||
|
|
v = item["v"]
|
|||
|
|
cls_id = item["cls_id"]
|
|||
|
|
track_id = item["track_id"]
|
|||
|
|
new_track_id = item["new_track_id"]
|
|||
|
|
|
|||
|
|
# should_report = True
|
|||
|
|
|
|||
|
|
# # 如果这个track_id已经上报过,检查是否超过上报间隔
|
|||
|
|
# if new_track_id in reported_track_ids:
|
|||
|
|
# last_report_time = reported_track_ids[new_track_id]
|
|||
|
|
# if current_time - last_report_time < report_interval:
|
|||
|
|
# print(f"基于track_id,触发去重事件:{new_track_id}")
|
|||
|
|
# should_report = False
|
|||
|
|
# print(f"new_track_idnew_track_id {new_track_id} {track_id}")
|
|||
|
|
# if should_report and track_filter.should_report(new_track_id):
|
|||
|
|
# should_report = True
|
|||
|
|
print(f"方法 send_frame——to {count_item}")
|
|||
|
|
count_item += 1
|
|||
|
|
if not track_filter.should_report(track_id):
|
|||
|
|
should_report = False
|
|||
|
|
break
|
|||
|
|
# if track_id < 0: # 适配MultiYOLODetector类,该类不支持追踪,默认track_id为-1
|
|||
|
|
# should_report = True
|
|||
|
|
|
|||
|
|
# 如果使用TrackIDEventFilter判断需要上报
|
|||
|
|
if should_report:
|
|||
|
|
us.append(u)
|
|||
|
|
vs.append(v)
|
|||
|
|
heights.append(h)
|
|||
|
|
# location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
|
|||
|
|
# height, cam_longitude, cam_latitude, img_width, img_height,
|
|||
|
|
# heights)
|
|||
|
|
#
|
|||
|
|
# if not location_results:
|
|||
|
|
# print("location_results is null")
|
|||
|
|
# continue
|
|||
|
|
if len(us) > 0:
|
|||
|
|
print("进行侵限计算")
|
|||
|
|
location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
|
|||
|
|
height, cam_longitude, cam_latitude, img_width, img_height,
|
|||
|
|
heights)
|
|||
|
|
if not location_results:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if location_results:
|
|||
|
|
des1 = location_results[0]
|
|||
|
|
des1_longitude = des1[0]
|
|||
|
|
des1_latitude = des1[1]
|
|||
|
|
des1_height = des1[2]
|
|||
|
|
des_location_result.append({"longitude": des1_longitude,
|
|||
|
|
"latitude": des1_latitude})
|
|||
|
|
|
|||
|
|
repeat_state = False
|
|||
|
|
show_des = 0
|
|||
|
|
str_loca = ""
|
|||
|
|
if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重
|
|||
|
|
if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息
|
|||
|
|
des1_back = target_location_back[0]
|
|||
|
|
des1_back_longitude = des1_back[0]
|
|||
|
|
des1_back_latitude = des1_back[1]
|
|||
|
|
des1_back_height = des1_back[2]
|
|||
|
|
|
|||
|
|
des1 = location_results[0]
|
|||
|
|
des1_longitude = des1[0]
|
|||
|
|
des1_latitude = des1[1]
|
|||
|
|
des1_height = des1[2]
|
|||
|
|
|
|||
|
|
des_latitude = []
|
|||
|
|
|
|||
|
|
str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}"
|
|||
|
|
des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude)
|
|||
|
|
show_des = des
|
|||
|
|
if des < repeat_dis:
|
|||
|
|
print(f"触发基于坐标判断重复,坐标距离:{des}")
|
|||
|
|
repeat_state = True
|
|||
|
|
else:
|
|||
|
|
target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存
|
|||
|
|
else:
|
|||
|
|
target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息
|
|||
|
|
if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑
|
|||
|
|
for idx, model_para in enumerate(model_para_list):
|
|||
|
|
if not isinstance(detections_list, list) or idx >= len(detections_list):
|
|||
|
|
continue
|
|||
|
|
detections = detections_list[idx] # 正确获取当前模型的检测结果
|
|||
|
|
chinese_label = model_para.get("model_chinese_labe", {})
|
|||
|
|
model_cls = model_para.get("model_cls_index", {})
|
|||
|
|
list_func_id = model_para.get("model_list_func_id", -11)
|
|||
|
|
func_id = model_para.get("func_id", [])
|
|||
|
|
|
|||
|
|
# 获取DRC消息(同步操作,放到线程池)
|
|||
|
|
local_drc_message = await loop.run_in_executor(upload_executor, get_local_drc_message)
|
|||
|
|
|
|||
|
|
if detections is None or len(detections.boxes) < 1:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 图像处理和结果计算
|
|||
|
|
frame11, box_detail1 = await loop.run_in_executor(
|
|||
|
|
upload_executor,
|
|||
|
|
cal_tricker_results,
|
|||
|
|
frame_copy, detections, None,
|
|||
|
|
func_id, local_func_cache, para, model_cls, chinese_label, list_func_id
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print(f"cal_tricker_results")
|
|||
|
|
print(f"func_id {func_id}")
|
|||
|
|
print(f"local_func_cache {local_func_cache}")
|
|||
|
|
print(f"para {para}")
|
|||
|
|
print(f"model_cls {model_cls}")
|
|||
|
|
print(f"chinese_label {chinese_label}")
|
|||
|
|
print(f"list_func_id {list_func_id}")
|
|||
|
|
print(f"box_detail1 {len(box_detail1)}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"处理帧时出错: {str(e)}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
count_pic = count_pic + 1
|
|||
|
|
|
|||
|
|
# 图像编码
|
|||
|
|
def encode_frame():
|
|||
|
|
success, buffer = cv2.imencode(".jpg", frame11)
|
|||
|
|
return buffer.tobytes() if success else None
|
|||
|
|
|
|||
|
|
def encode_origin_frame():
|
|||
|
|
success, buffer = cv2.imencode(".jpg", frame)
|
|||
|
|
return buffer.tobytes() if success else None
|
|||
|
|
|
|||
|
|
buffer_bytes = await loop.run_in_executor(upload_executor, encode_frame)
|
|||
|
|
buffer_origin_bytes = await loop.run_in_executor(upload_executor, encode_origin_frame)
|
|||
|
|
if not buffer_bytes:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 并行处理上传和MQTT发布
|
|||
|
|
async def upload_and_publish():
|
|||
|
|
# 上传到MinIO
|
|||
|
|
def upload_minio():
|
|||
|
|
minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None)
|
|||
|
|
minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None)
|
|||
|
|
return minio_path, minio_origin_path, file_type
|
|||
|
|
|
|||
|
|
minio_path, minio_origin_path, file_type = await loop.run_in_executor(
|
|||
|
|
upload_executor, upload_minio
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 构造消息
|
|||
|
|
message = {
|
|||
|
|
"task_id": task_id,
|
|||
|
|
"time": str(datetime.now()),
|
|||
|
|
"detection_id": timestamp,
|
|||
|
|
"minio": {"minio_path": minio_path, "minio_origin_path": minio_origin_path,
|
|||
|
|
"file_type": file_type},
|
|||
|
|
"box_detail": box_detail1,
|
|||
|
|
"uav_location": local_drc_message,
|
|||
|
|
"osd_location": {
|
|||
|
|
"longitude": cam_longitude,
|
|||
|
|
"latitude": cam_latitude
|
|||
|
|
},
|
|||
|
|
"des_location": des_location_result
|
|||
|
|
}
|
|||
|
|
await event_queue.put({
|
|||
|
|
"timestamp": timestamp # 存储事件触发的时刻,用作视频制作
|
|||
|
|
})
|
|||
|
|
message_json = json.dumps(message, ensure_ascii=False)
|
|||
|
|
await mqtt.publish(mqtt_topic, message_json)
|
|||
|
|
|
|||
|
|
asyncio.create_task(upload_and_publish())
|
|||
|
|
# 使用共享变量时加锁,进而进行跳帧,不然上报太频繁
|
|||
|
|
# async with cache_lock:
|
|||
|
|
# # 读取或修改共享变量
|
|||
|
|
# send_count = shared_local_cache["send_count"]
|
|||
|
|
# send_count = send_count + 1
|
|||
|
|
# shared_local_cache["send_count"] = send_count
|
|||
|
|
#
|
|||
|
|
# # 伪代码,目前安全帽比较难识别到
|
|||
|
|
# if 100014 == model_para_list[0]["model_list_func_id"]:
|
|||
|
|
# if send_count > 2:
|
|||
|
|
# # 创建独立任务执行上传和发布
|
|||
|
|
# shared_local_cache["send_count"] = 0
|
|||
|
|
# asyncio.create_task(upload_and_publish())
|
|||
|
|
# else:
|
|||
|
|
# if send_count > 30:
|
|||
|
|
# # 创建独立任务执行上传和发布
|
|||
|
|
# shared_local_cache["send_count"] = 0
|
|||
|
|
# asyncio.create_task(upload_and_publish())
|
|||
|
|
if 'frame' in locals():
|
|||
|
|
upload_time = time.time() - upload_start
|
|||
|
|
print(f"上传耗时: {upload_time:.4f}s")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"send_frame_to_s3_mq 错误: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
print("send_frame_to_s3 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
# finally:
|
|||
|
|
# log_perf('send_frame_to_s3_mq', start_time)
|
|||
|
|
|
|||
|
|
# 更新性能统计
|
|||
|
|
stats['processed'] += 1
|
|||
|
|
if time.time() - stats['last_time'] >= 1.0:
|
|||
|
|
stats['avg_fps'] = stats['processed'] / (time.time() - stats['last_time'])
|
|||
|
|
print(f"处理速度: {stats['avg_fps']:.2f} FPS")
|
|||
|
|
stats['processed'] = 0
|
|||
|
|
stats['last_time'] = time.time()
|
|||
|
|
print("send_frame_to_s3线程已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def cal_des_invade(loop, invade_executor, task_id: str, mqtt, mqtt_publish_topic,
|
|||
|
|
list_points: list[list[any]], camera_para: Camera_Para, model_count: int,
|
|||
|
|
cancel_flag: asyncio.Event = None, invade_queue: asyncio.Queue = None,
|
|||
|
|
event_queue: asyncio.Queue = None,
|
|||
|
|
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1):
|
|||
|
|
# loop = asyncio.get_running_loop()
|
|||
|
|
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
print("cal_des_invade")
|
|||
|
|
pic_count_hongxian = 0
|
|||
|
|
track_filter = TrackIDEventFilter(max_inactive_time=8.0)
|
|||
|
|
# 用于记录已上报的track_id及其上报时间
|
|||
|
|
reported_track_ids = defaultdict(float)
|
|||
|
|
# 上报间隔时间(秒)
|
|||
|
|
report_interval = 8
|
|||
|
|
target_location_back = [] # 本地缓存,用作位置重复计算
|
|||
|
|
current_time_second = int(time.time())
|
|||
|
|
while not cancel_flag.is_set():
|
|||
|
|
# 检查队列长度,避免堆积
|
|||
|
|
if invade_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2:
|
|||
|
|
print(f"警告:invade_queue 积压(当前长度={invade_queue.qsize()}),清空队列")
|
|||
|
|
while not invade_queue.empty():
|
|||
|
|
await invade_queue.get()
|
|||
|
|
print("队列已清空,等待新数据...")
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
continue
|
|||
|
|
# 获取队列数据
|
|||
|
|
try:
|
|||
|
|
cv_frame = await asyncio.wait_for(invade_queue.get(), timeout=0.8)
|
|||
|
|
# 检查数据类型
|
|||
|
|
if not isinstance(cv_frame, dict):
|
|||
|
|
print(f"⚠️ 警告:cv_frame 不是字典,而是 {type(cv_frame)}")
|
|||
|
|
continue
|
|||
|
|
except asyncio.TimeoutError:
|
|||
|
|
print(f"cal_des_invade TimeoutError")
|
|||
|
|
continue
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
print("cal_des_invade 收到取消信号")
|
|||
|
|
raise
|
|||
|
|
if cv_frame is None:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if repeat_time > 0: # 基于时间的重复计算,是否使能
|
|||
|
|
read_cv_time_second = int(time.time())
|
|||
|
|
|
|||
|
|
if read_cv_time_second - current_time_second < repeat_time:
|
|||
|
|
continue
|
|||
|
|
else:
|
|||
|
|
current_time_second = read_cv_time_second
|
|||
|
|
|
|||
|
|
# print("cal_des_invade inside")
|
|||
|
|
frame_copy = cv_frame['frame_copy']
|
|||
|
|
frame = cv_frame['frame']
|
|||
|
|
target_point = cv_frame['target_point']
|
|||
|
|
message_point = cv_frame['message_point']
|
|||
|
|
cls_count = cv_frame['cls_count']
|
|||
|
|
model_list_func_id = cv_frame['model_list_func_id']
|
|||
|
|
model_func_id = cv_frame['model_func_id']
|
|||
|
|
|
|||
|
|
air_alti = cv_frame['osd_info']
|
|||
|
|
detections = cv_frame['detections']
|
|||
|
|
detections_list = cv_frame['detections_list']
|
|||
|
|
model_para_list = cv_frame.get('model_para', {}) # 默认空字典
|
|||
|
|
timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典
|
|||
|
|
if not isinstance(frame, np.ndarray) or frame.size == 0:
|
|||
|
|
print("⚠️ 警告:frame不是有效的numpy数组")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 获取宽高
|
|||
|
|
img_height, img_width = frame.shape[:2]
|
|||
|
|
if not air_alti:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
gimbal_yaw = air_alti.gimbal_yaw
|
|||
|
|
gimbal_pitch = air_alti.gimbal_pitch
|
|||
|
|
gimbal_roll = air_alti.gimbal_roll
|
|||
|
|
height = air_alti.height
|
|||
|
|
cam_longitude = air_alti.longitude
|
|||
|
|
cam_latitude = air_alti.latitude
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
current_time = time.time()
|
|||
|
|
h = device_height
|
|||
|
|
us = []
|
|||
|
|
vs = []
|
|||
|
|
heights = []
|
|||
|
|
|
|||
|
|
should_report = True
|
|||
|
|
for item in target_point:
|
|||
|
|
# # 跳过无效的track_id
|
|||
|
|
# 检查是否应该上报该track_id
|
|||
|
|
u = item["u"]
|
|||
|
|
v = item["v"]
|
|||
|
|
cls_id = item["cls_id"]
|
|||
|
|
track_id = item["track_id"]
|
|||
|
|
new_track_id = item["new_track_id"]
|
|||
|
|
|
|||
|
|
# # 如果这个track_id已经上报过,检查是否超过上报间隔
|
|||
|
|
# # if new_track_id in reported_track_ids:
|
|||
|
|
# if track_id in reported_track_ids:
|
|||
|
|
# last_report_time = reported_track_ids[track_id]
|
|||
|
|
# if current_time - last_report_time < report_interval:
|
|||
|
|
# print(f"基于track_id,触发去重事件:{track_id}")
|
|||
|
|
# should_report = False
|
|||
|
|
print(f"方法 cal_des_invade")
|
|||
|
|
if not track_filter.should_report(track_id):
|
|||
|
|
should_report = False
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# if track_id < 0: # 适配MultiYOLODetector类,该类不支持追踪,默认track_id为-1
|
|||
|
|
# should_report = True
|
|||
|
|
|
|||
|
|
# 如果使用TrackIDEventFilter判断需要上报
|
|||
|
|
if should_report:
|
|||
|
|
us.append(u)
|
|||
|
|
vs.append(v)
|
|||
|
|
heights.append(h)
|
|||
|
|
|
|||
|
|
if should_report:
|
|||
|
|
print("进行侵限计算")
|
|||
|
|
location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
|
|||
|
|
height, cam_longitude, cam_latitude, img_width, img_height,
|
|||
|
|
heights)
|
|||
|
|
if not location_results:
|
|||
|
|
continue
|
|||
|
|
# for point in location_results:
|
|||
|
|
# target_location.append({point[1], point[2]})
|
|||
|
|
# point_list = [] # 整理红线集合
|
|||
|
|
repeat_state = False
|
|||
|
|
show_des = 0
|
|||
|
|
str_loca = ""
|
|||
|
|
des_location_result = []
|
|||
|
|
if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重
|
|||
|
|
if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息
|
|||
|
|
des1_back = target_location_back[0]
|
|||
|
|
des1_back_longitude = des1_back[0]
|
|||
|
|
des1_back_latitude = des1_back[1]
|
|||
|
|
des1_back_height = des1_back[2]
|
|||
|
|
|
|||
|
|
des1 = location_results[0]
|
|||
|
|
des1_longitude = des1[0]
|
|||
|
|
des1_latitude = des1[1]
|
|||
|
|
|
|||
|
|
des_location_result.append({"longitude": des1_longitude,
|
|||
|
|
"latitude": des1_latitude})
|
|||
|
|
des1_height = des1[2]
|
|||
|
|
str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}"
|
|||
|
|
des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude)
|
|||
|
|
show_des = des
|
|||
|
|
if des < repeat_dis:
|
|||
|
|
print(f"触发基于坐标判断重复,坐标距离:{des}")
|
|||
|
|
repeat_state = True
|
|||
|
|
else:
|
|||
|
|
target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存
|
|||
|
|
else:
|
|||
|
|
target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息
|
|||
|
|
# 测试红线
|
|||
|
|
# for point in results:
|
|||
|
|
# point_list.append([point["u"], point["v"]])
|
|||
|
|
# for point in message_point:
|
|||
|
|
# cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
|
|||
|
|
# (point["box"][2], point["box"][3]), (0, 255, 255), 2)
|
|||
|
|
# cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int32)],
|
|||
|
|
# isClosed=True, color=(0, 0, 255), thickness=2)
|
|||
|
|
pic_count_hongxian = pic_count_hongxian + 1
|
|||
|
|
# cv2.imwrite(f"save_pic\invade-hongxian\hongxianongly-{pic_count_hongxian}.jpg", frame_copy)
|
|||
|
|
|
|||
|
|
# if len(invade_point) > 0:
|
|||
|
|
# print("hongxianhongxianhongxianhongxian")
|
|||
|
|
# pic_count = pic_count + 1
|
|||
|
|
# # cv2.imwrite(f"save_pic\invade\hongxian-{pic_count}.jpg", frame)
|
|||
|
|
# drawn_frame = frame_copy # 关键修复:深拷贝绘制后的帧
|
|||
|
|
# 图像编码
|
|||
|
|
|
|||
|
|
if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑
|
|||
|
|
print(f"未发现重复,现在上传{str_loca}======{show_des}")
|
|||
|
|
|
|||
|
|
def encode_origin_frame():
|
|||
|
|
success, buffer = cv2.imencode(".jpg", frame)
|
|||
|
|
return buffer.tobytes() if success else None
|
|||
|
|
|
|||
|
|
def encode_frame():
|
|||
|
|
success, buffer = cv2.imencode(".jpg", frame_copy)
|
|||
|
|
# if task_id == "2a5d7a80-109a-4cd6-aa95-0e8c9aab6b3f-1": # 测试安全帽
|
|||
|
|
# cv2.imwrite(f"save_pic\qm\hongxian-encode-{pic_count_hongxian}.jpg", frame)
|
|||
|
|
#
|
|||
|
|
# if task_id == "7eecadd6-001f-488c-bed9-1086079c3450-1": # 测试工地车辆
|
|||
|
|
# cv2.imwrite(f"save_pic\gdcl\hongxian-encode-{pic_count_hongxian}.jpg", frame_copy)
|
|||
|
|
|
|||
|
|
return buffer.tobytes() if success else None
|
|||
|
|
|
|||
|
|
buffer_bytes = await loop.run_in_executor(invade_executor, encode_frame)
|
|||
|
|
buffer_origin_bytes = await loop.run_in_executor(invade_executor, encode_origin_frame)
|
|||
|
|
if not buffer_bytes:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 并行处理上传和MQTT发布
|
|||
|
|
async def upload_and_publish():
|
|||
|
|
# 上传到MinIO
|
|||
|
|
def upload_minio():
|
|||
|
|
minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None)
|
|||
|
|
minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None)
|
|||
|
|
return minio_path, minio_origin_path, file_type
|
|||
|
|
# return upload_frame_buff_from_buffer(buffer_bytes, None)
|
|||
|
|
|
|||
|
|
minio_path, minio_origin_path, file_type = await loop.run_in_executor(
|
|||
|
|
invade_executor, upload_minio
|
|||
|
|
)
|
|||
|
|
print(f"minio_pathminio_pathminio_pathminio_path {minio_path}")
|
|||
|
|
# 构造消息
|
|||
|
|
message = {
|
|||
|
|
"task_id": task_id,
|
|||
|
|
"time": str(datetime.now()),
|
|||
|
|
"detection_id": timestamp,
|
|||
|
|
"minio": {"minio_path": minio_path,
|
|||
|
|
"minio_origin_path": minio_origin_path,
|
|||
|
|
"file_type": file_type},
|
|||
|
|
"box_detail": [{
|
|||
|
|
"model_id": model_func_id,
|
|||
|
|
"cls_count": cls_count,
|
|||
|
|
"box_count": [message_point], # 特殊处理
|
|||
|
|
"location_results": location_results # 增加经纬度信息
|
|||
|
|
}],
|
|||
|
|
"osd_location": {
|
|||
|
|
"longitude": cam_longitude,
|
|||
|
|
"latitude": cam_latitude
|
|||
|
|
},
|
|||
|
|
"des_location": des_location_result
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if not event_queue.full():
|
|||
|
|
await event_queue.put({
|
|||
|
|
"timestamp": timestamp # 存储事件触发的时刻,用作视频制作
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
logger.warning("event_queue 帧队列已满,等待1ms后重试")
|
|||
|
|
await asyncio.sleep(0.001)
|
|||
|
|
|
|||
|
|
print(f"hongxianhongxianhongxianhongxian上传 {message}:{event_queue.qsize()} ")
|
|||
|
|
message_json = json.dumps(message, ensure_ascii=False)
|
|||
|
|
await mqtt.publish(mqtt_publish_topic, message_json)
|
|||
|
|
|
|||
|
|
asyncio.create_task(upload_and_publish())
|
|||
|
|
# 使用共享变量时加锁,进而进行跳帧,不然上报太频繁
|
|||
|
|
|
|||
|
|
# async with invade_cache_lock:
|
|||
|
|
# # 读取或修改共享变量
|
|||
|
|
# send_count = shared_local_cache["invade_send_count"]
|
|||
|
|
# send_count = send_count + 1
|
|||
|
|
# shared_local_cache["invade_send_count"] = send_count
|
|||
|
|
# if send_count > 1:
|
|||
|
|
# # 创建独立任务执行上传和发布
|
|||
|
|
# shared_local_cache["invade_send_count"] = 0
|
|||
|
|
# asyncio.create_task(upload_and_publish())
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"cal_des_invade 错误: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
|
|||
|
|
print("cal_des_invade读取线程已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def upload_process_main(group_id: str, task_id: str,
|
|||
|
|
mqtt_pub_ip: str, mqtt_pub_port: int,
|
|||
|
|
mqtt_pub_topic: str,
|
|||
|
|
device_height: float, repeat_dis: float,
|
|||
|
|
repeat_time: float,
|
|||
|
|
group_queues: Dict):
|
|||
|
|
"""进程B的主函数 - 优化版本(集成三个异步方法)"""
|
|||
|
|
|
|||
|
|
pid = os.getpid()
|
|||
|
|
logger.info(f"[ProcessB] 启动 (PID:{pid}),任务ID: {task_id}")
|
|||
|
|
|
|||
|
|
# 检查CPU状态
|
|||
|
|
check_cpu_affinity()
|
|||
|
|
|
|||
|
|
# 设置低优先级
|
|||
|
|
try:
|
|||
|
|
os.nice(19) # 最低优先级
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
async def run_upload_tasks():
|
|||
|
|
try:
|
|||
|
|
# 初始化MQTT服务
|
|||
|
|
mqtt = MQTTService(mqtt_pub_ip, port=mqtt_pub_port)
|
|||
|
|
await mqtt.connect()
|
|||
|
|
|
|||
|
|
# 初始化队列和事件
|
|||
|
|
stop_event = group_queues.get('stop_event')
|
|||
|
|
cv_frame_queue = group_queues.get('cv_frame_queue')
|
|||
|
|
invade_queue = group_queues.get('invade_queue')
|
|||
|
|
event_queue = group_queues.get('event_queue')
|
|||
|
|
timestamp_frame_queue = group_queues.get('timestamp_frame_queue')
|
|||
|
|
|
|||
|
|
# 创建线程池执行器
|
|||
|
|
loop = asyncio.get_running_loop()
|
|||
|
|
upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
|
|||
|
|
invade_executor = ThreadPoolExecutor(max_workers=Config.INVADE_WORKERS)
|
|||
|
|
event_video_executor = ThreadPoolExecutor(max_workers=Config.EVENT_VIDEO_WORKERS)
|
|||
|
|
|
|||
|
|
# 创建取消事件
|
|||
|
|
cancel_flag = asyncio.Event()
|
|||
|
|
|
|||
|
|
# 创建三个主要任务
|
|||
|
|
tasks = [
|
|||
|
|
# 任务1: 处理侵限事件
|
|||
|
|
# asyncio.create_task(
|
|||
|
|
# cal_des_invade(
|
|||
|
|
# loop=loop,
|
|||
|
|
# invade_executor=invade_executor,
|
|||
|
|
# task_id=task_id,
|
|||
|
|
# mqtt=mqtt,
|
|||
|
|
# mqtt_publish_topic=mqtt_pub_topic,
|
|||
|
|
# list_points=[], # 需要从外部传入
|
|||
|
|
# camera_para=None, # 需要从外部传入
|
|||
|
|
# model_count=1, # 需要根据实际情况设置
|
|||
|
|
# cancel_flag=cancel_flag,
|
|||
|
|
# invade_queue=invade_queue,
|
|||
|
|
# event_queue=event_queue,
|
|||
|
|
# device_height=device_height,
|
|||
|
|
# repeat_dis=repeat_dis,
|
|||
|
|
# repeat_time=repeat_time
|
|||
|
|
# ),
|
|||
|
|
# name=f"{group_id}_cal_des_invade"
|
|||
|
|
# ),
|
|||
|
|
|
|||
|
|
# # 任务2: 发送帧到S3和MQTT
|
|||
|
|
# asyncio.create_task(
|
|||
|
|
# send_frame_to_s3_mq(
|
|||
|
|
# loop=loop,
|
|||
|
|
# upload_executor=upload_executor,
|
|||
|
|
# task_id=task_id,
|
|||
|
|
# mqtt=mqtt,
|
|||
|
|
# mqtt_topic=mqtt_pub_topic,
|
|||
|
|
# cancel_flag=cancel_flag,
|
|||
|
|
# cv_frame_queue=cv_frame_queue,
|
|||
|
|
# event_queue=event_queue,
|
|||
|
|
# device_height=device_height,
|
|||
|
|
# repeat_dis=repeat_dis,
|
|||
|
|
# repeat_time=repeat_time
|
|||
|
|
# ),
|
|||
|
|
# name=f"{group_id}_send_frame_to_s3_mq"
|
|||
|
|
# ),
|
|||
|
|
|
|||
|
|
# 任务3: 切割事件视频并发布
|
|||
|
|
asyncio.create_task(
|
|||
|
|
cut_evnt_video_publish(
|
|||
|
|
# cut_evnt_video_publish_test(
|
|||
|
|
task_id=task_id,
|
|||
|
|
mqtt=mqtt,
|
|||
|
|
mqtt_topic=mqtt_pub_topic,
|
|||
|
|
cancel_flag=cancel_flag,
|
|||
|
|
# event_queue=event_queue,
|
|||
|
|
# timestamp_frame_queue=timestamp_frame_queue
|
|||
|
|
event_queue=event_queue,
|
|||
|
|
timestamp_frame_queue=timestamp_frame_queue
|
|||
|
|
),
|
|||
|
|
name=f"{group_id}_cut_evnt_video_publish"
|
|||
|
|
)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 监控停止事件
|
|||
|
|
while not stop_event.is_set():
|
|||
|
|
await asyncio.sleep(0.1)
|
|||
|
|
|
|||
|
|
# 收到停止信号,取消所有任务
|
|||
|
|
cancel_flag.set()
|
|||
|
|
for task in tasks:
|
|||
|
|
task.cancel()
|
|||
|
|
|
|||
|
|
# 等待任务结束
|
|||
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[ProcessB] 异常: {e}")
|
|||
|
|
error_queue = group_queues.get('error_queue')
|
|||
|
|
if error_queue:
|
|||
|
|
error_queue.put({
|
|||
|
|
'group_id': group_id,
|
|||
|
|
'process': 'ProcessB',
|
|||
|
|
'error': str(e),
|
|||
|
|
'time': datetime.now()
|
|||
|
|
})
|
|||
|
|
finally:
|
|||
|
|
# 清理资源
|
|||
|
|
if 'mqtt' in locals():
|
|||
|
|
await mqtt.disconnect()
|
|||
|
|
if 'upload_executor' in locals():
|
|||
|
|
upload_executor.shutdown(wait=True)
|
|||
|
|
if 'invade_executor' in locals():
|
|||
|
|
invade_executor.shutdown(wait=True)
|
|||
|
|
if 'event_video_executor' in locals():
|
|||
|
|
event_video_executor.shutdown(wait=True)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 运行异步上传任务
|
|||
|
|
asyncio.run(run_upload_tasks())
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"[ProcessB] 主循环异常: {e}")
|
|||
|
|
error_queue = group_queues.get('error_queue')
|
|||
|
|
if error_queue:
|
|||
|
|
error_queue.put({
|
|||
|
|
'group_id': group_id,
|
|||
|
|
'process': 'ProcessB',
|
|||
|
|
'error': str(e),
|
|||
|
|
'time': datetime.now()
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
|
|||
|
|
def start_rtmp_processing_with_groups(video_url: str, task_id: str,
|
|||
|
|
model_configs: List[Dict],
|
|||
|
|
mqtt_pub_ip: str, mqtt_pub_port: int,
|
|||
|
|
mqtt_pub_topic: str,
|
|||
|
|
mqtt_sub_ip: str, mqtt_sub_port: int,
|
|||
|
|
mqtt_sub_topic: str,
|
|||
|
|
output_rtmp_url: str,
|
|||
|
|
invade_enable: bool, invade_file: str,
|
|||
|
|
camera_para_url: str,
|
|||
|
|
device_height: float, repeat_dis: float,
|
|||
|
|
repeat_time: float,
|
|||
|
|
process_group_manager: ProcessGroupManager = None):
|
|||
|
|
"""启动RTMP处理进程组"""
|
|||
|
|
|
|||
|
|
if process_group_manager is None:
|
|||
|
|
process_group_manager = ProcessGroupManager()
|
|||
|
|
|
|||
|
|
group_id = f"group_{task_id}"
|
|||
|
|
|
|||
|
|
# 创建进程组
|
|||
|
|
result = process_group_manager.create_process_group(
|
|||
|
|
group_id=group_id,
|
|||
|
|
video_url=video_url,
|
|||
|
|
task_id=task_id,
|
|||
|
|
model_configs=model_configs,
|
|||
|
|
mqtt_pub_ip=mqtt_pub_ip,
|
|||
|
|
mqtt_pub_port=mqtt_pub_port,
|
|||
|
|
mqtt_pub_topic=mqtt_pub_topic,
|
|||
|
|
mqtt_sub_ip=mqtt_sub_ip,
|
|||
|
|
mqtt_sub_port=mqtt_sub_port,
|
|||
|
|
mqtt_sub_topic=mqtt_sub_topic,
|
|||
|
|
output_rtmp_url=output_rtmp_url,
|
|||
|
|
invade_enable=invade_enable,
|
|||
|
|
invade_file=invade_file,
|
|||
|
|
camera_para_url=camera_para_url,
|
|||
|
|
device_height=device_height,
|
|||
|
|
repeat_dis=repeat_dis,
|
|||
|
|
repeat_time=repeat_time
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return process_group_manager, result
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
# 系统检查
|
|||
|
|
logger.info("=== 高性能AI视频处理系统 ===")
|
|||
|
|
logger.info(f"Python版本: {sys.version}")
|
|||
|
|
logger.info(f"操作系统: {sys.platform}")
|
|||
|
|
|
|||
|
|
# CPU信息
|
|||
|
|
total_cpus = psutil.cpu_count(logical=False)
|
|||
|
|
logical_cpus = psutil.cpu_count(logical=True)
|
|||
|
|
logger.info(f"CPU核心: {total_cpus}物理/{logical_cpus}逻辑")
|
|||
|
|
|
|||
|
|
# 检查隔离支持
|
|||
|
|
try:
|
|||
|
|
with open('/proc/cmdline', 'r') as f:
|
|||
|
|
cmdline = f.read()
|
|||
|
|
if 'isolcpus=' in cmdline:
|
|||
|
|
logger.info("✅ 系统支持CPU隔离")
|
|||
|
|
else:
|
|||
|
|
logger.info("⚠️ 未启用isolcpus,使用软件隔离")
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
multiprocessing.freeze_support()
|
|||
|
|
multiprocessing.set_start_method('spawn', force=True)
|
|||
|
|
|
|||
|
|
process_group_manager = ProcessGroupManager()
|
|||
|
|
|
|||
|
|
# 配置参数
|
|||
|
|
model_configs = [{
|
|||
|
|
'path': r"/mnt/mydisk/ai/pt/best.pt",
|
|||
|
|
'engine_path': r"/mnt/mydisk/ai/engine/renche/renche.engine",
|
|||
|
|
'so_path': r"/mnt/mydisk/ai/engine/renche/libmyplugins.so",
|
|||
|
|
# 'path': r"D:\project\AI-PYTHON\Ai_tottle\pt\best.pt",
|
|||
|
|
# 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine",
|
|||
|
|
# 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll",
|
|||
|
|
# 'cls_map': {'hote(infrared)&fire': '火', 'smoke': '烟'},
|
|||
|
|
# 'allowed_classes': ['smoke', 'hote(infrared)&fire'],
|
|||
|
|
# "cls_index": [0, 1],
|
|||
|
|
# "class_names": ['smoke', 'hote(infrared)&fire'],
|
|||
|
|
# "chinese_label": {0: '烟', 1: '火'},
|
|||
|
|
# "func_id": 100041,
|
|||
|
|
# {0: 'people', 1: 'bicycle',
|
|||
|
|
# 2: 'car', 3: 'van', 4: 'truck',
|
|||
|
|
# 5: 'tricycle', 6: 'awning-tricycle',
|
|||
|
|
# 7: 'bus',8: 'motor'}
|
|||
|
|
# "func_id": 100041,
|
|||
|
|
'cls_map': {'people': '人', 'bicycle': '自行车',
|
|||
|
|
'car': '车1', 'van': '车2',
|
|||
|
|
'truck': '车3', 'tricycle': '车4',
|
|||
|
|
'awning-tricycle': '车5', 'bus': '车6',
|
|||
|
|
'motor': '车7', },
|
|||
|
|
'allowed_classes': ['people', 'bicycle', 'car', 'van', 'truck', 'tricycle', 'awning-tricycle', 'bus', 'motor'],
|
|||
|
|
"cls_index": [0, 1, 2, 3, 4, 5, 6, 7, 8],
|
|||
|
|
"class_names": ['people', 'bicycle', 'car', 'van', 'truck', 'tricycle', 'awning-tricycle', 'bus', 'motor'],
|
|||
|
|
"chinese_label": {0: '人', 1: '自行车',
|
|||
|
|
2: '车1', 3: '车2',
|
|||
|
|
4: '车3', 5: '车4',
|
|||
|
|
6: '车5', 7: '车6',
|
|||
|
|
8: '车7',},
|
|||
|
|
"func_id": 100001,
|
|||
|
|
|
|||
|
|
"list_func_id": 100000,
|
|||
|
|
|
|||
|
|
"para_invade_enable": False,
|
|||
|
|
"config_conf": 0.4
|
|||
|
|
}]
|
|||
|
|
|
|||
|
|
# video_url = "rtmp://222.212.85.86:1935/live/1234567890"
|
|||
|
|
|
|||
|
|
task_id = "441ba16f-757f-481f-8f73-68a885fcc229-test"
|
|||
|
|
|
|||
|
|
# mqtt_pub_ip = "47.108.62.6"
|
|||
|
|
# mqtt_pub_port = 12503
|
|||
|
|
# mqtt_pub_ip = "8.137.54.85"
|
|||
|
|
# mqtt_pub_ip = "192.168.0.11"
|
|||
|
|
# mqtt_pub_port = 1883
|
|||
|
|
# mqtt_pub_topic = "thing/product/ai/events"
|
|||
|
|
# # mqtt_sub_ip = "47.108.62.6"
|
|||
|
|
# # mqtt_sub_port = 12503
|
|||
|
|
# # mqtt_sub_ip = "8.137.54.85"
|
|||
|
|
# mqtt_sub_ip = "192.168.0.11"
|
|||
|
|
# mqtt_sub_port = 1883
|
|||
|
|
# mqtt_sub_topic = "thing/product/8UUXN6S00A0CK7TEST/drc/up"
|
|||
|
|
# # output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
|
|||
|
|
# output_rtmp_url = "rtmp://192.168.0.11:1935/live/1234567890ai123"
|
|||
|
|
|
|||
|
|
# 九寨沟扎如
|
|||
|
|
# video_url = "rtmp://183.222.73.60:1935/live/1581F8HGX259H00A0WKH"
|
|||
|
|
# mqtt_sub_topic = "thing/product/8UUXNA700A0PWB/drc/up"
|
|||
|
|
# # 则查洼机场
|
|||
|
|
# video_url = "rtmp://183.222.73.60:1935/live/1581F8HGX259H00A0WJC"
|
|||
|
|
# mqtt_sub_topic = "thing/product/8UUXNA700A0PWA/drc/up"
|
|||
|
|
#
|
|||
|
|
# mqtt_pub_ip = "183.222.73.60"
|
|||
|
|
# mqtt_pub_port = 1883
|
|||
|
|
# mqtt_pub_topic = "thing/product/ai/events"
|
|||
|
|
# mqtt_sub_ip = "183.222.73.60"
|
|||
|
|
# mqtt_sub_port = 1883
|
|||
|
|
#
|
|||
|
|
# output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
|
|||
|
|
|
|||
|
|
### # 校友之家
|
|||
|
|
# video_url = "rtmp://222.212.85.86:1935/live/1581F6QAD243C00BMD6W"
|
|||
|
|
# video_url = "rtmp://222.212.85.86:1935/live/123456789012"
|
|||
|
|
video_url = "rtmp://192.168.130.21:1935/live/123456789012"
|
|||
|
|
# video_url = "rtmp://192.168.110.13/live/12345678"
|
|||
|
|
mqtt_sub_topic = "thing/product/8UUXN6S00A0CK7TEST/drc/up"
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
mqtt_pub_ip = "47.108.62.6"
|
|||
|
|
mqtt_pub_port = 12503
|
|||
|
|
mqtt_pub_topic = "thing/product/ai/events"
|
|||
|
|
mqtt_sub_ip = "47.108.62.6"
|
|||
|
|
mqtt_sub_port = 12503
|
|||
|
|
|
|||
|
|
# output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
|
|||
|
|
output_rtmp_url = "rtmp://192.168.130.21:1935/live/123456789012ai"
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
invade_enable = True
|
|||
|
|
invade_file = "meta_data/校友之家耕地占用demo.geojson"
|
|||
|
|
camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt"
|
|||
|
|
device_height = 600
|
|||
|
|
repeat_dis = 20
|
|||
|
|
repeat_time = 20
|
|||
|
|
|
|||
|
|
# 启动进程组
|
|||
|
|
process_group_manager, group_id1 = start_rtmp_processing_with_groups(
|
|||
|
|
video_url, task_id, model_configs,
|
|||
|
|
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
|
|||
|
|
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
|
|||
|
|
output_rtmp_url,
|
|||
|
|
invade_enable, invade_file, camera_para_url,
|
|||
|
|
device_height, repeat_dis, repeat_time,
|
|||
|
|
process_group_manager
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if group_id1:
|
|||
|
|
logger.info(f"✅ 进程组 {group_id1} 启动成功")
|
|||
|
|
logger.info(f"进程A: 高速推理 (CPU 0-3)")
|
|||
|
|
logger.info(f"进程B: 后台上传 (CPU 4-5)")
|
|||
|
|
else:
|
|||
|
|
logger.error("❌ 进程组启动失败")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 主监控
|
|||
|
|
monitor_count = 0
|
|||
|
|
while True:
|
|||
|
|
time.sleep(1)
|
|||
|
|
monitor_count += 1
|
|||
|
|
|
|||
|
|
if monitor_count % 10 == 0: # 每10秒打印一次
|
|||
|
|
logger.info(f"\n=== 系统监控 ({datetime.now()}) ===")
|
|||
|
|
|
|||
|
|
# 系统CPU使用率
|
|||
|
|
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
|
|||
|
|
# 只显示前6个CPU核心
|
|||
|
|
cpu_display = [f'{p:.1f}%' for p in cpu_percent[:6]]
|
|||
|
|
logger.info(f"CPU 0-5 使用率: {cpu_display}")
|
|||
|
|
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
logger.info("\n=== 收到停止信号,正在停止... ===")
|
|||
|
|
process_group_manager.stop_all_groups(timeout=10)
|
|||
|
|
logger.info("=== 所有进程已停止 ===")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"主进程异常: {e}")
|
|||
|
|
process_group_manager.stop_all_groups(timeout=10)
|