ai_project_v1/check_start_rtmp_processing-yuanbao.py

3614 lines
158 KiB
Python
Raw Permalink Normal View History

import asyncio
import json
import logging
import multiprocessing
import subprocess
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import List, Dict, Any, Optional
import queue
import os
import sys
import psutil
import threading
import av
import cv2
import numpy as np
import torch
from multiprocessing import Queue, Process, Event, Manager
from cv_back_video import cal_tricker_results
from middleware.MQTTService import MQTTService, MQTTDevice, empty_osd_callback
from middleware.entity.air_attitude import Air_Attitude
from middleware.entity.camera_para import read_camera_params, Camera_Para
from middleware.minio_util import downFile, upload_frame_buff_from_buffer, upload_video_buff_from_buffer
from touying.ImageReproject_python.cal_func import red_line_reproject, cal_canv_location_by_osd
from yolo.cv_multi_model_back_video import put_chinese_text, is_point_in_polygonlist, TrackIDEventFilter, \
haversine, frames_to_video_bytes
from yolo.detect.multi_yolo_trt_detect_track_trt8 import MultiYoloTrtDetectorTrackId_TRT8
from yolo.detect.multi_yolo_trt_detect_track_trt10_yolo11 import MultiYoloTrtDetectorTrackId_TRT10_YOLO11
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [PID:%(process)d] - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Config:
MAX_WORKERS = 8
FRAME_QUEUE_SIZE = 25
PROCESSED_QUEUE_SIZE = 60
Timestamped_Queue_SIZE = 500
RETRY_COUNT = 3
READ_RTMP_WORKERS = 2
PROCESS_FRAME_WORKERS = 2
UPLOAD_WORKERS = 2
WRITE_FRAME_WORKERS = 2
INVADE_WORKERS = 2
EVENT_VIDEO_WORKERS = 2
MAX_QUEUE_WARN_SIZE = 15
MODEL_INPUT_SIZE = (640, 640)
DEFAULT_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
TARGET_FPS = 25
MAX_FRAME_DROP = 5
PERF_LOG_INTERVAL = 5.0
# CPU隔离配置 - 使用独立的CPU核心
CPU_ALLOCATIONS = {
'process_a': { # RTMP处理进程 - 高优先级,高速推理
'cpus': [0, 1, 2, 3], # 使用4个CPU核心
'numa_node': 0,
'priority': 'high', # 高优先级
'sched_policy': 'fifo' # FIFO调度
},
'process_b': { # 上传进程 - 低优先级,后台处理
'cpus': [4, 5], # 使用2个CPU核心
'numa_node': 0,
'priority': 'low', # 低优先级
'sched_policy': 'other'
}
}
async def detect_video_resolution(loop, executor, video_url):
"""探测视频流的分辨率"""
try:
logger.info(f"开始探测视频分辨率: {video_url}")
simple_cmd = [
'ffprobe',
'-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0:s=x',
video_url
]
def run_simple_probe():
try:
result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0 and result.stdout.strip():
dimensions = result.stdout.strip().split('x')
if len(dimensions) == 2:
width = int(dimensions[0])
height = int(dimensions[1])
if width > 0 and height > 0:
return width, height
except Exception as e:
logger.warning(f"简单分辨率探测失败: {e}")
return None
dimensions = await loop.run_in_executor(executor, run_simple_probe)
if dimensions:
width, height = dimensions
logger.info(f"探测到视频分辨率: {width}x{height}")
return width, height
logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080")
return 1920, 1080
except Exception as e:
logger.error(f"分辨率探测异常: {e}")
logger.warning("使用默认分辨率 1920x1080")
return 1920, 1080
class CPUIsolator:
"""CPU隔离管理器 - 高性能版本"""
def __init__(self):
self.total_cpus = psutil.cpu_count(logical=False)
self.logical_cpus = psutil.cpu_count(logical=True)
logger.info(f"系统CPU信息:")
logger.info(f" - 物理核心数: {self.total_cpus}")
logger.info(f" - 逻辑核心数: {self.logical_cpus}")
def isolate_process(self, pid: int, config: Dict) -> bool:
"""高性能CPU隔离"""
try:
cpu_list = config.get('cpus', [])
priority = config.get('priority', 'normal')
sched_policy = config.get('sched_policy', 'other')
logger.info(f"隔离进程 {pid} 到CPU: {cpu_list}, 优先级: {priority}, 调度策略: {sched_policy}")
process = psutil.Process(pid)
# 1. 设置CPU亲和性
process.cpu_affinity(cpu_list)
# 2. 设置调度策略和优先级 (Linux)
if sys.platform == "linux":
try:
if sched_policy == 'fifo':
# SCHED_FIFO: 实时调度,最高优先级
param = os.sched_param(os.sched_get_priority_max(os.SCHED_FIFO))
os.sched_setscheduler(pid, os.SCHED_FIFO, param)
elif sched_policy == 'rr':
# SCHED_RR: 轮转调度
param = os.sched_param(os.sched_get_priority_max(os.SCHED_RR))
os.sched_setscheduler(pid, os.SCHED_RR, param)
elif sched_policy == 'batch':
# SCHED_BATCH: 批处理调度
param = os.sched_param(0)
os.sched_setscheduler(pid, os.SCHED_BATCH, param)
except Exception as e:
logger.warning(f"设置调度策略失败: {e}")
# 3. 设置nice值
try:
if priority == 'high':
os.nice(-19) # 最高优先级
elif priority == 'low':
os.nice(19) # 最低优先级
else:
os.nice(0) # 正常优先级
except:
pass
# 验证隔离
current_affinity = process.cpu_affinity()
logger.info(f"进程 {pid} 隔离完成: CPU亲和性={current_affinity}")
return True
except Exception as e:
logger.error(f"进程隔离失败: {e}")
return False
def check_cpu_affinity():
"""检查当前进程的CPU亲和性"""
pid = os.getpid()
try:
process = psutil.Process(pid)
affinity = process.cpu_affinity()
# 检查调度策略
sched_policy = "unknown"
if sys.platform == "linux":
try:
policy = os.sched_getscheduler(pid)
if policy == os.SCHED_FIFO:
sched_policy = "SCHED_FIFO"
elif policy == os.SCHED_RR:
sched_policy = "SCHED_RR"
elif policy == os.SCHED_BATCH:
sched_policy = "SCHED_BATCH"
elif policy == os.SCHED_OTHER:
sched_policy = "SCHED_OTHER"
except:
pass
logger.info(f"进程 {pid} CPU状态: 亲和性={affinity}, 调度策略={sched_policy}")
return affinity
except Exception as e:
logger.warning(f"检查CPU状态失败: {e}")
return []
class ProcessGroupManager:
"""进程组管理器 - 高性能版本"""
def __init__(self):
self.manager = Manager()
self.process_groups = {}
self.group_queues = {}
self.cpu_isolator = CPUIsolator()
def create_process_group(self, group_id: str,
video_url: str, task_id: str, model_configs: List[Dict],
mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str,
mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str,
output_rtmp_url: str,
invade_enable: bool, invade_file: str, camera_para_url: str,
device_height: float, repeat_dis: float, repeat_time: float):
"""创建进程组"""
# 创建队列
group_queues = {
'cv_frame_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
'event_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
'invade_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE),
'timestamp_frame_queue': Queue(maxsize=Config.Timestamped_Queue_SIZE),
'stop_event': Event(),
'error_queue': Queue(),
'performance_counter': Manager().Value('i', 0) # 性能计数器
}
# 创建进程A (RTMP处理高优先级)
process_a = Process(
target=rtmp_process_main,
name=f"{group_id}_ProcessA",
args=(group_id, video_url, task_id, model_configs,
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
output_rtmp_url, invade_enable, invade_file,
camera_para_url, group_queues)
)
process_a.start()
# CPU隔离
logger.info(f"隔离进程A (PID:{process_a.pid})")
self.cpu_isolator.isolate_process(process_a.pid, Config.CPU_ALLOCATIONS['process_a'])
time.sleep(1)
if not process_a.is_alive():
logger.error("进程A启动失败")
return None
# 创建进程B (上传处理,低优先级) - 使用正确的函数名
process_b = Process(
target=upload_process_main, # 修改为正确的函数名
name=f"{group_id}_ProcessB",
args=(group_id, task_id, mqtt_pub_ip, mqtt_pub_port,
mqtt_pub_topic, device_height, repeat_dis,
repeat_time, group_queues)
)
process_b.start()
# CPU隔离
logger.info(f"隔离进程B (PID:{process_b.pid})")
self.cpu_isolator.isolate_process(process_b.pid, Config.CPU_ALLOCATIONS['process_b'])
time.sleep(1)
if not process_b.is_alive():
logger.error("进程B启动失败")
if process_a.is_alive():
process_a.terminate()
return None
process_list = [process_a, process_b]
# 存储信息
self.process_groups[group_id] = {
'process_list': process_list,
'status': 'running',
'created_at': datetime.now(),
'task_id': task_id,
'video_url': video_url
}
self.group_queues[group_id] = group_queues
logger.info(f"进程组 {group_id} 创建完成")
# 启动监控
self.start_performance_monitor(group_id)
return group_id
def start_performance_monitor(self, group_id: str):
"""启动性能监控线程"""
def monitor():
last_time = time.time()
while group_id in self.process_groups:
try:
current_time = time.time()
elapsed = current_time - last_time
if elapsed >= 5.0: # 每5秒监控一次
# 检查进程状态
group_info = self.process_groups.get(group_id)
if not group_info:
break
process_list = group_info.get('process_list', [])
for i, process in enumerate(process_list):
if process and process.is_alive():
try:
proc = psutil.Process(process.pid)
cpu_affinity = proc.cpu_affinity()
cpu_percent = proc.cpu_percent(interval=0.5)
memory_mb = proc.memory_info().rss / 1024 / 1024
if i == 0:
logger.info(
f"进程A性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB")
else:
logger.info(
f"进程B性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB")
except Exception as e:
logger.warning(f"监控进程状态失败: {e}")
last_time = current_time
time.sleep(1)
except Exception as e:
logger.warning(f"监控线程异常: {e}")
time.sleep(5)
monitor_thread = threading.Thread(
target=monitor,
name=f"{group_id}_monitor",
daemon=True
)
monitor_thread.start()
def stop_process_group(self, group_id: str, timeout: int = 10):
"""停止进程组"""
if group_id not in self.process_groups:
return False
logger.info(f"停止进程组 {group_id}")
group_info = self.process_groups[group_id]
group_queues = self.group_queues.get(group_id, {})
# 发送停止信号
stop_event = group_queues.get('stop_event')
if stop_event:
stop_event.set()
# 等待进程结束
process_list = group_info.get('process_list')
for process in process_list:
if process and process.is_alive():
process.join(timeout=timeout)
if process.is_alive():
process.terminate()
if group_id in self.group_queues:
del self.group_queues[group_id]
group_info['status'] = 'stopped'
group_info['stopped_at'] = datetime.now()
logger.info(f"进程组 {group_id} 已停止")
return True
def stop_all_groups(self, timeout: int = 10):
"""停止所有进程组"""
logger.info("停止所有进程组")
for group_id in list(self.process_groups.keys()):
self.stop_process_group(group_id, timeout)
logger.info("所有进程组已停止")
# # #
async def read_rtmp_frames(
loop,
read_rtmp_frames_executor: ThreadPoolExecutor,
video_url: str,
device: Optional[MQTTDevice] = None,
topic_camera_osd: Optional[str] = None,
method_camera_osd: Optional[str] = None,
topic_osd_info: Optional[str] = None,
method_osd_info: Optional[str] = None,
cancel_flag: Optional[asyncio.Event] = None,
frame_queue: asyncio.Queue = None,
timestamp_frame_queue: Queue = None
):
"""
异步读取 RTMP 流帧优化版移除帧率控制优化线程池
"""
max_retries = 20
retry_delay = 2
pic_count = 0
attempt = 0
time_start = time.time_ns() # 添加开始时间统计
frame_count = 0 # 统计总帧数
if cancel_flag is None:
cancel_flag = asyncio.Event()
# loop = asyncio.get_running_loop()
# 打印初始统计信息
print(f"开始读取RTMP流: {video_url}")
while not cancel_flag.is_set() and attempt < max_retries:
attempt += 1
if cancel_flag.is_set():
logger.info("收到停止信号,终止 RTMP 读取")
break
container = None
try:
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
# 1. 关键优化:将同步的 av.open 和流初始化放到线程池
container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
(s for s in container.streams if s.type == 'video'))
logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
# 2. 提前获取一次OSD消息验证MQTT是否正常
if device and topic_osd_info and method_osd_info:
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
if osd_msg:
logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
else:
logger.warning("初始OSD消息为空可能MQTT尚未收到消息")
# 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
async def async_frame_generator():
"""异步帧生成器在后台线程迭代同步帧通过yield返回给事件循环"""
def sync_frame_iter():
try:
for frame in container.decode(video=0):
# 线程内检查取消标志(需定期检查,避免线程无法退出)
if cancel_flag.is_set():
logger.info("后台线程检测到取消信号,停止帧迭代")
break
# 确保是3通道RGB
if len(frame.planes) == 1: # 如果是灰度图
gray = frame.to_ndarray(format='gray')
# 转换为3通道BGR不修改尺寸
bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
yield bgr
else:
# 保持原始尺寸和色彩空间,只转换格式
bgr = frame.to_ndarray(format='bgr24')
yield bgr
except Exception as e:
logger.error(f"同步帧迭代出错: {e}")
finally:
if container:
container.close()
logger.info("RTMP容器已关闭")
# 将同步迭代器包装为异步生成器
gen = sync_frame_iter()
while not cancel_flag.is_set():
try:
# 每次获取一帧都通过线程池执行,避免长时间阻塞
frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
if frame is None: # 迭代结束
break
yield frame
except StopIteration:
logger.info("RTMP流帧迭代结束")
break
except Exception as e:
logger.error(f"异步获取帧出错: {e}")
break
# 4. 异步迭代帧(不阻塞事件循环)
async for frame in async_frame_generator():
if cancel_flag.is_set():
logger.info("检测到取消信号,停止读取帧")
break
try:
# 5. 帧转换也放到线程池av.Frame.to_ndarray是CPU密集操作
img = frame.copy() # 确保不修改原始帧
osd_info = None
# 6. 此时事件循环未被阻塞MQTT消息已缓存get_latest_message可即时获取
if device and topic_osd_info and method_osd_info:
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
if osd_msg and hasattr(osd_msg, 'data'):
osd_info = Air_Attitude(
gimbal_pitch=osd_msg.data.gimbal_pitch,
gimbal_roll=osd_msg.data.gimbal_roll,
gimbal_yaw=osd_msg.data.gimbal_yaw,
height=osd_msg.data.height,
latitude=osd_msg.data.latitude,
longitude=osd_msg.data.longitude
)
# 7. 异步放入帧队列(避免队列满时阻塞)
# if not frame_queue.full():
if True:
# if True:
pic_count += 1
frame_count += 1 # 增加总帧数统计
time_ns = time.time_ns()
# 定期输出统计信息每1000帧
if time_ns - time_start > 1000000000:
print(f"readFrames {pic_count}")
pic_count = 0
time_start = time_ns
# # if img is not None and osd_info is not None:
await frame_queue.put((img, osd_info, time_ns))
if not timestamp_frame_queue.full():
print(f"timestamp_frame_queue size {timestamp_frame_queue.qsize()}")
timestamp_frame_queue.put(
{
"timestamp": time_ns,
"frame": img
})
# else:
# queue_size=timestamp_frame_queue.qsize()
# clear_size=queue_size/3
# for i in range(int(clear_size)):
# timestamp_frame_queue.get_nowait() #如果满了就清理1/3 队列
logger.debug(
f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
else:
logger.warning("帧队列已满等待1ms后重试")
await asyncio.sleep(0.001)
except Exception as frame_error:
logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
continue
except (av.AVError, IOError) as e:
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
if attempt < max_retries:
await asyncio.sleep(retry_delay)
else:
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
except asyncio.CancelledError:
logger.info("read_rtmp_frames 收到取消信号")
raise
except Exception as e:
logger.error(f"未知错误: {e}", exc_info=True)
if attempt < max_retries:
await asyncio.sleep(retry_delay)
finally:
# 双重保险:确保容器关闭
if container and not container.closed:
await loop.run_in_executor(None, container.close)
logger.info("RTMP容器在finally中关闭")
# 最终统计信息
if frame_count > 0:
total_time = (time.time_ns() - time_start) / 1e9
avg_fps = frame_count / total_time if total_time > 0 else 0
print(f"RTMP流读取完成总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
else:
print("RTMP流读取失败未获取到任何帧")
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
#
# async def read_rtmp_frames(
# loop,
# read_rtmp_frames_executor: ThreadPoolExecutor,
# video_url: str,
# device: Optional[MQTTDevice] = None,
# topic_camera_osd: Optional[str] = None,
# method_camera_osd: Optional[str] = None,
# topic_osd_info: Optional[str] = None,
# method_osd_info: Optional[str] = None,
# cancel_flag: Optional[asyncio.Event] = None,
# frame_queue: asyncio.Queue = None,
# timestamp_frame_queue: TimestampedQueue = None
# ):
# """
# 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧)
# """
# print("read_rtmp_frames")
# max_retries = 20
# retry_delay = 2
# pic_count = 0
# attempt = 0
# time_start = time.time_ns()
# frame_count = 0
#
# if cancel_flag is None:
# cancel_flag = asyncio.Event()
#
# print(f"开始读取RTMP流: {video_url}")
#
# while not cancel_flag.is_set() and attempt < max_retries:
# attempt += 1
# if cancel_flag.is_set():
# logger.info("收到停止信号,终止 RTMP 读取")
# break
#
# ffmpeg_process = None
# width, height = None, None
# frame_size = None
#
# try:
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
#
# # 1. 探测视频分辨率
# width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url)
#
# if width is None or height is None:
# logger.warning("使用默认分辨率 1920x1080")
# width, height = 1920, 1080
#
# frame_size = width * height * 3
# logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes")
#
# # 2. 启动 FFmpeg 进程(优化参数提高性能)
# ffmpeg_cmd = [
# 'ffmpeg',
# '-i', video_url,
# '-loglevel', 'warning',
# '-fflags', 'nobuffer',
# '-flags', 'low_delay',
# '-avioflags', 'direct',
# '-threads', '1', # 单线程,减少上下文切换
# '-f', 'image2pipe',
# '-pix_fmt', 'bgr24',
# '-vcodec', 'rawvideo',
# '-'
# ]
#
# ffmpeg_process = await loop.run_in_executor(
# read_rtmp_frames_executor,
# lambda: subprocess.Popen(
# ffmpeg_cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# bufsize=frame_size # 设置合适的缓冲区大小
# )
# )
#
# logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}")
#
# # 3. 初始化帧读取状态
# frame_sequence = 0
# last_timestamp = time_start
# consecutive_corrupted_frames = 0 # 连续损坏帧计数
# max_consecutive_corrupted = 10 # 最大连续损坏帧数
#
# while not cancel_flag.is_set():
# try:
# # 直接读取完整帧(高性能方式)
# raw_frame = await loop.run_in_executor(
# read_rtmp_frames_executor,
# lambda: ffmpeg_process.stdout.read(frame_size)
# )
#
# if not raw_frame:
# logger.warning("读取到空帧数据,流可能已结束")
# break
#
# current_time_ns = time.time_ns()
# frame_sequence += 1
# frame_count += 1
#
# # 处理帧数据(无论是否完整)
# img = None
# is_corrupted = False
# # print(f"读取 read_rtmp_frames 判断")
# try:
# if len(raw_frame) == frame_size:
# # 完整帧处理
# frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3))
# img = frame.copy()
# consecutive_corrupted_frames = 0 # 重置连续损坏计数
# else:
# # 损坏帧处理
# logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}")
# is_corrupted = True
# consecutive_corrupted_frames += 1
#
# # 创建替代帧
# if consecutive_corrupted_frames <= max_consecutive_corrupted:
# # 尝试部分恢复
# img = np.zeros((height, width, 3), dtype=np.uint8)
# valid_data = min(len(raw_frame), frame_size)
# if valid_data > 0:
# # 尽可能填充有效数据
# temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8)
# img.flat[:len(temp_frame)] = temp_frame
# else:
# # 连续损坏过多,创建空白帧
# img = np.zeros((height, width, 3), dtype=np.uint8)
# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
#
# except Exception as frame_error:
# logger.error(f"帧数据处理错误: {frame_error}")
# # 创建空白帧作为后备
# img = np.zeros((height, width, 3), dtype=np.uint8)
# is_corrupted = True
# consecutive_corrupted_frames += 1
# # print(f"读取 read_rtmp_frames 判断1")
# # 获取OSD信息
# osd_info = None
# if device and topic_osd_info and method_osd_info:
# try:
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
# if osd_msg and hasattr(osd_msg, 'data'):
# osd_info = Air_Attitude(
# gimbal_pitch=osd_msg.data.gimbal_pitch,
# gimbal_roll=osd_msg.data.gimbal_roll,
# gimbal_yaw=osd_msg.data.gimbal_yaw,
# height=osd_msg.data.height,
# latitude=osd_msg.data.latitude,
# longitude=osd_msg.data.longitude
# )
# except Exception as osd_error:
# logger.warning(f"获取OSD信息失败: {osd_error}")
# # print(f"读取 read_rtmp_frames 判断2")
# # 放入帧队列
# # if img is not None and not frame_queue.full():
# if True:
# # 确保时间戳递增
# if current_time_ns <= last_timestamp:
# current_time_ns = last_timestamp + 1
# last_timestamp = current_time_ns
#
# # 统计信息
# pic_count += 1
# if current_time_ns - time_start > 1000000000: # 1秒
# elapsed_seconds = (current_time_ns - time_start) / 1e9
# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
# print(
# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
# pic_count = 0
# time_start = current_time_ns
#
# # print(f"读取 read_rtmp_frames 实时流")
#
# # # 准备帧数据
# # frame_data = {
# # "sequence": frame_sequence,
# # "frame": img,
# # "osd_info": osd_info,
# # "timestamp": current_time_ns,
# # "is_corrupted": is_corrupted
# # }
# time_ns = time.time_ns()
# # 放入队列
# await frame_queue.put((img, osd_info, time_ns))
# timestamp_frame_queue.append({
# "timestamp": time_ns,
# "frame": img
# })
#
# if frame_sequence % 100 == 0: # 每100帧输出一次日志
# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
#
# elif frame_queue.full():
# logger.warning("帧队列已满,跳过此帧")
# await asyncio.sleep(0.001) # 短暂等待
#
# # 检查是否需要重新探测分辨率(仅在连续损坏时)
# if consecutive_corrupted_frames > 5:
# logger.warning("连续帧损坏,尝试重新探测分辨率")
# try:
# new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor,
# video_url)
# if new_width and new_height and (new_width != width or new_height != height):
# logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}")
# width, height = new_width, new_height
# frame_size = width * height * 3
# consecutive_corrupted_frames = 0 # 重置计数
# except Exception as probe_error:
# logger.warning(f"重新探测分辨率失败: {probe_error}")
#
# except Exception as e:
# logger.error(f"读取帧数据时出错: {e}", exc_info=True)
# # 检查 FFmpeg 进程状态
# if ffmpeg_process and ffmpeg_process.poll() is not None:
# try:
# stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
# if stderr_output:
# logger.error(f"FFmpeg 进程错误: {stderr_output}")
# except:
# pass
# logger.error("FFmpeg 进程已退出")
# break
# # 短暂等待后继续
# await asyncio.sleep(0.01)
# continue
#
# except (subprocess.SubprocessError, IOError) as e:
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# else:
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
# except asyncio.CancelledError:
# logger.info("read_rtmp_frames 收到取消信号")
# raise
# except Exception as e:
# logger.error(f"未知错误: {e}", exc_info=True)
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# finally:
# if ffmpeg_process:
# try:
# ffmpeg_process.terminate()
# try:
# await asyncio.wait_for(
# loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait),
# timeout=2.0
# )
# except asyncio.TimeoutError:
# ffmpeg_process.kill()
# await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait)
# except Exception as e:
# logger.warning(f"关闭FFmpeg进程时出错: {e}")
# logger.info("FFmpeg 进程已关闭")
#
# if frame_count > 0:
# total_time = (time.time_ns() - time_start) / 1e9
# avg_fps = frame_count / total_time if total_time > 0 else 0
# print(f"RTMP流读取完成总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
# else:
# print("RTMP流读取失败未获取到任何帧")
#
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
#
# async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT8, cancel_flag: asyncio.Event,
# frame_queue: asyncio.Queue, processed_queue: asyncio.Queue,
# performance_counter):
# """高速处理帧 - 保持原有速度"""
# frame_count = 0
# start_time = time.time()
# last_log_time = start_time
#
# # 检查CPU状态
# check_cpu_affinity()
#
# # 设置高性能模式
# logger.info("[ProcessA] 进入高速推理模式")
#
# # 预加载图片
# # test_image = cv2.imread(r"/mnt/mydisk1/dj/ai2/middleware/20250711-111516-531_1761271860531810831.jpg")
# test_image = cv2.imread(r"E:\yolo-dataset\image\0000006_00611_d_0000002.jpg")
# if test_image is None:
# logger.error("[ProcessA] 无法加载测试图片")
# return
#
# # 使用高速循环
# batch_size = 4
# batch_frames = []
# batch_timestamps = []
#
# while not cancel_flag.is_set():
# try:
# # 高速处理,不添加额外延迟
# current_time = time.time_ns()
#
# # 添加到批次
# batch_frames.append(test_image.copy())
# batch_timestamps.append(current_time)
# frame_count += 1
#
# # 当批次满时进行推理
# if len(batch_frames) >= batch_size:
# try:
# # 批量推理
# time_pr_start = time.time_ns()
#
# # 批量处理每个帧
# batch_results = []
# for frame in batch_frames:
# detections, detections_list, model_para = await detector.predict(frame)
# batch_results.append({
# 'detections': detections,
# 'detections_list': detections_list,
# 'model_para': model_para
# })
#
# time_pr_end = time.time_ns()
# avg_inference_time = (time_pr_end - time_pr_start) / len(batch_frames) / 1000000
#
# # 批量放入队列
# for i, (frame, result, timestamp) in enumerate(zip(batch_frames, batch_results, batch_timestamps)):
# processed_data = {
# 'frame': frame,
# 'osd_info': None,
# 'detections': result['detections'],
# 'detections_list': result['detections_list'],
# 'timestamp': timestamp,
# 'model_para': result['model_para'],
# 'predict_state': bool(result['detections'])
# }
#
# if not processed_queue.full():
# await processed_queue.put(processed_data)
#
# # 性能统计
# current_time_log = time.time()
# if current_time_log - last_log_time >= 1.0: # 每秒输出一次
# total_elapsed = current_time_log - start_time
# fps = frame_count / total_elapsed
# logger.info(
# f"[ProcessA] 推理速度: {fps:.1f} FPS, 累计: {frame_count}帧, 平均推理: {avg_inference_time:.2f}ms")
# if performance_counter:
# performance_counter.value = frame_count
# last_log_time = current_time_log
#
# except Exception as e:
# logger.error(f"[ProcessA] 批量推理错误: {e}")
#
# # 清空批次
# batch_frames.clear()
# batch_timestamps.clear()
#
# # 微小延迟避免CPU 100%
# await asyncio.sleep(0.0001)
#
# except asyncio.CancelledError:
# break
# except Exception as e:
# logger.error(f"[ProcessA] 处理错误: {e}")
# await asyncio.sleep(0.01)
#
# logger.info(f"[ProcessA] 推理结束,总帧数: {frame_count}")
async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT10_YOLO11, cancel_flag: asyncio.Event,
frame_queue: asyncio.Queue, processed_queue: asyncio.Queue):
"""协程处理帧队列"""
start_time = time.time()
time_start = time.time_ns()
pic_count = 0
while not cancel_flag.is_set():
frame_start = time.time()
try:
frame, osd_info, timestamp = await asyncio.wait_for(
frame_queue.get(),
timeout=0.5 # 延长超时,适配处理耗时
)
try:
time_pr_start = time.time_ns()
detections, detections_list, model_para = await detector.predict(frame)
time_pr_end = time.time_ns()
print(f"time_pr_starttime_pr_start {(time_pr_end - time_pr_start) / 1000000}")
predict_state = True
if detections:
print("检测到任何目标")
if not detections:
predict_state = False
logger.debug("未检测到任何目标")
# continue
# # # # 显示帧用于调试(可选)
# cv2.imshow('process_frames', frame)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# stop_event.set()
# break
processed_data = {
'frame': frame,
'osd_info': osd_info,
'detections': detections,
'detections_list': detections_list,
'timestamp': timestamp,
'model_para': model_para,
'predict_state': predict_state # predict状态判定方便rtmp推流做状态判定
}
if not processed_queue.full():
time_end = time.time_ns()
pic_count = pic_count + 1
if time_end - time_start > 1000000000:
print(f"processframes {pic_count}")
pic_count = 0
time_start = time_end
await processed_queue.put(processed_data)
else:
logger.warning("处理队列已满,丢弃帧")
if 'frame' in locals():
frame_time = time.time() - frame_start
# print(f"处理帧耗时: {frame_time:.4f}s")
except Exception as e:
logger.error(f"处理帧时出错: {e}", exc_info=True)
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
print("process_frames 收到取消信号")
raise
except Exception as e:
logger.error(f"获取帧时发生意外错误: {e}", exc_info=True)
await asyncio.sleep(0.1)
# finally:
# log_perf('process_frames', start_time)
print("process_frames读取线程已停止")
# async def write_results_to_rtmp_high_speed(
# task_id: str,
# output_url: str = None,
# input_fps: float = None,
# list_points: List[List[Any]] = None,
# camera_para: Camera_Para = None,
# invade_state: bool = False,
# cancel_flag: asyncio.Event = None,
# processed_queue: asyncio.Queue = None,
# invade_queue: Queue = None,
# cv_frame_queue: asyncio.Queue = None,
# cv_process_queue: Optional[Queue] = None,
# event_process_queue: Optional[Queue] = None,
# stream_containers: Dict[str, Any] = None
# ):
# """高速推流处理"""
# frame_count = 0
# start_time = time.time()
# last_log_time = start_time
#
# logger.info(f"[ProcessA] 推流任务启动 - 高速模式")
#
# try:
# while not cancel_flag.is_set():
# try:
# # 高速获取数据
# processed_data = await asyncio.wait_for(
# processed_queue.get(),
# timeout=0.01
# )
#
# frame_count += 1
#
# # 放入进程间队列
# if cv_process_queue is not None and not cv_process_queue.full():
# try:
# # 简化数据,只放关键信息
# simplified_data = {
# 'frame_count': frame_count,
# 'timestamp': processed_data.get('timestamp'),
# 'predict_state': processed_data.get('predict_state', False)
# }
# cv_process_queue.put_nowait(simplified_data)
# except queue.Full:
# pass
#
# # 性能统计
# current_time = time.time()
# if current_time - last_log_time >= 1.0: # 每秒输出一次
# elapsed = current_time - start_time
# fps = frame_count / elapsed
# logger.info(f"[ProcessA] 推流处理: {frame_count}帧, FPS: {fps:.1f}")
# last_log_time = current_time
#
# except asyncio.TimeoutError:
# # 队列为空,继续循环
# await asyncio.sleep(0.001)
# continue
# except asyncio.CancelledError:
# break
# except Exception as e:
# logger.error(f"[ProcessA] 推流错误: {e}")
#
# except Exception as e:
# logger.error(f"[ProcessA] 推流主循环异常: {e}")
#
# finally:
# logger.info(f"[ProcessA] 推流结束,总帧数: {frame_count}")
#
#
# async def write_results_to_rtmp_high_speed(task_id: str, output_url: str = None, input_fps: float = None,
# list_points: list[list[any]] = None, camera_para: Camera_Para = None,
# invade_state: bool = False, cancel_flag: asyncio.Event = None,
# cv_frame_thread_queue: Queue = None,
# processed_queue: asyncio.Queue = None,
# invade_queue: asyncio.Queue = None,
# cv_frame_queue: asyncio.Queue = None,
# event_queue: Optional[Queue] = None,
# stream_containers: Dict[str, Any] = None,
# ):
# # global stream_containers, count_pic
# start_time = time.time()
# time_start = time.time_ns()
# pic_count = 0
# # 修改推流参数
# options = {
# 'preset': 'veryfast',
# 'tune': 'zerolatency',
# 'crf': '23',
# 'g': '50', # 关键帧间隔
# 'threads': '2', # 限制编码线程
# }
# codec_name = 'libx264'
#
# max_retries = 3
# retry_delay = 2.0
#
# # 初始化视频输出
# output_video_path = None
# video_writer = None
# frame_width, frame_height = None, None
# fps = input_fps or Config.TARGET_FPS
# last_frame_time = time.time() - 1
# frame_interval = 1.0 / fps
# try:
# while not cancel_flag.is_set():
# frame_start = time.time()
# try:
# # 第一层帧率控制
# # current_time = time.time()
# #
# # time_diff = frame_interval - (current_time - last_frame_time)
# # if time_diff > 0:
# # await asyncio.sleep(time_diff)
# # last_frame_time = current_time
#
# processed_data = await asyncio.wait_for(
# processed_queue.get(),
# timeout=1
# )
#
# # 确保 processed_data 是字典
# if not isinstance(processed_data, dict):
# print(f"❌ 错误processed_data 不是字典,而是 {type(processed_data)}")
# continue
#
# frame = processed_data['frame']
# # 绘制检测结果
# frame_copy = frame.copy()
# predict_state = processed_data['predict_state']
# osd_info = processed_data['osd_info']
# img_height, img_width = frame.shape[:2]
#
# results = []
# results_list = []
# # 启用侵限且拿到了飞机的姿态信息,再绘制红线
# if invade_state and osd_info:
# gimbal_yaw = osd_info.gimbal_yaw
# gimbal_pitch = osd_info.gimbal_pitch
# gimbal_roll = osd_info.gimbal_roll
# height = osd_info.height
# # print(f"heightheightheight {height}")
# cam_longitude = osd_info.longitude
# cam_latitude = osd_info.latitude
# # 当前list_points 虽然是二维数组,但是只存了一个,后续根据业务变化
#
# for points in list_points:
# # 批量返回图像的像素坐标
# point_list = []
# results = red_line_reproject(gimbal_yaw, gimbal_pitch, gimbal_roll, height, cam_longitude,
# cam_latitude,
# img_width,
# img_height, points, camera_para)
# if results:
# results_list.append(results) # 支持两个区域,高压侵限、营业线侵限
# for point in results:
# point_list.append([point["u"], point["v"]])
# cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int64)], isClosed=True,
# color=(0, 0, 255),
# thickness=2)
# # print(f"predict_statepredict_state {predict_state}")
# # 模型输出了推理结果
# if predict_state:
# # 测试代码,用做测试推理结果,初始化视频写入器(如果尚未初始化)
# if video_writer is None and output_video_path:
# frame_width = frame.shape[1]
# frame_height = frame.shape[0]
# # 定义视频编码器和输出文件
# fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'avc1' 等其他编码
# # fps = Config.TARGET_FPS # 使用配置中的目标帧率
# video_writer = cv2.VideoWriter(
# output_video_path,
# fourcc,
# fps,
# (frame_width, frame_height)
# )
# print(f"视频写入器已初始化,分辨率: {frame_width}x{frame_height}, FPS: {fps}")
#
# detections = processed_data['detections']
# detections_list = processed_data['detections_list']
# model_para = processed_data['model_para']
#
# class_names = model_para[0]["model_class_names"]
# chinese_label = model_para[0]["model_chinese_labe"]
# cls_map = model_para[0]["cls_map"]
# para_invade_enable = model_para[0]["para_invade_enable"]
# model_list_func_id = model_para[0]["model_list_func_id"]
# model_func_id = model_para[0]["func_id"]
# invade_point = []
# message_point = []
# target_point = [] # 存储满足条件的图像坐标,方便后续经纬度转换
# cls_count = 0
#
# # # 初始化统计字典
# class_stats = defaultdict(int)
# reversed_dict = {value: 0 for value in cls_map.values()}
# bg_color = (173, 216, 230) # 文本框底色使用淡蓝色
# text_color = (0, 255, 0) # 绿色
#
# for det in detections:
# x1, y1, x2, y2 = map(int, det.bbox) # 确保坐标是整数
# cls_id = det.class_id # 假设Detection对象有class_id属性
# class_name = det.class_name
# confidence = det.confidence
# track_id = det.track_id
# new_track_id = track_id * 100 + cls_id # 类型小于100或者为负数
# # 更新统计
# class_stats[cls_id] += 1
# # 如果开起侵限功能,就只显示侵限内的框
# point_x = (x1 + x2) / 2
# point_y = (y1 + y2) / 2
# # print(f"class_name--{class_name}")
# # print(f"model_class_names: {model_para[0]['model_class_names']}")
#
# if class_name not in model_para[0]["model_class_names"]:
# continue
#
# en_name = model_para[0]["model_chinese_labe"][
# model_para[0]["model_class_names"].index(class_name)]
# if invade_state:
# # 同时适配多个区域的侵限判断
# is_invade = is_point_in_polygonlist(point_x, point_y, results_list)
# # is_invade = is_point_in_polygon(point_x, point_y, results)
# # print(f"is_invadeis_invadeis_invade {is_invade} {len(results)}")
# if is_invade:
# cls_count += 1
# invade_point.append({
# "u": point_x,
# "v": point_y,
# "class_name": class_name
# })
# target_point.append({
# "u": point_x,
# "v": point_y,
# "cls_id": cls_id,
# "track_id": track_id,
# "new_track_id": new_track_id
# }) # 对于侵限,只存储侵限目标
# # model_list_func_id = model_para[0]["model_list_func_id"]
# # model_func_id = model_para[0]["func_id"]
#
# message_point.append({
# "confidence": float(confidence),
# "cls_id": cls_id,
# "type_name": en_name,
# "track_id": track_id,
# "box": [x1, y1, x2, y2]
# })
# label = f"{en_name}:{confidence:.2f}:{track_id}"
# # 计算文本位置
# text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[
# 0]
# text_width, text_height = text_size[0], text_size[1]
# text_x = x1
# text_y = y1 - 5
#
# # 如果文本超出图像顶部,则放在框内部下方
# if text_y < 0:
# text_y = y2 + text_height + 5
# temp_img = frame_copy.copy()
# frame_copy = put_chinese_text(
# temp_img,
# # label, # 置信度、类别、用作测试
# "", # 注释掉汉字
# (text_x, text_y - text_height),
# )
# else:
# cls_count += 1
# # 绘制边界框
# cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (0, 255, 255), 2)
# message_point.append({
# "confidence": float(confidence),
# "cls_id": cls_id,
# "type_name": en_name,
# "track_id": track_id,
# "box": [x1, y1, x2, y2]
# })
# target_point.append({
# "u": point_x,
# "v": point_y,
# "cls_id": cls_id,
# "track_id": track_id,
# "new_track_id": new_track_id
# }) # 对于侵限,只存储侵限目标
# # 准备标签文本
# # label = f"{chinese_label.get(cls_id, class_name)}: {confidence:.2f}:{track_id}"
# label = f"{confidence:.2f}:{track_id}"
# # 计算文本位置
# text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=8)[0]
# text_width, text_height = text_size[0], text_size[1]
# text_x = x1
# text_y = y1 - 5
# # 如果文本超出图像顶部,则放在框内部下方
# if text_y < 0:
# text_y = y2 + text_height + 5
#
# # 绘制文本背景
# padding = 2
# temp_img = frame_copy.copy()
# frame_copy = put_chinese_text(
# temp_img,
# # label, # 置信度、类别、用作测试
# "", # 注释掉汉字
# (text_x, text_y - text_height),
# )
#
# if invade_state:
# for point in message_point:
# cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
# (point["box"][2], point["box"][3]),
# (0, 255, 255), 2)
# # 画红线
# # 在左上角显示统计结果
# stats_text = []
# for cls_id, count in class_stats.items():
# cls_name = chinese_label.get(cls_id,
# class_names[cls_id] if class_names and cls_id < len(
# class_names) else str(
# cls_id))
# reversed_dict[cls_name] = count
# for key, value in reversed_dict.items():
# stats_text.append(f"{key}: {value}")
#
# if stats_text:
# # 计算统计文本的总高度
# text_height = cv2.getTextSize("Test", cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, thickness=1)[0][
# 1]
# total_height = len(stats_text) * (text_height + 18) # 5是行间距
#
# # 统计文本的起始位置(左上角)
# start_x = 50 # 留出200像素宽度
# start_y = 20 # 从顶部开始
#
# # # 绘制统计背景
# # # bg_color = (0, 0, 0)
# # frame_copy = cv2.rectangle(
# # frame_copy,
# # (start_x - 10, start_y - 10),
# # (400, start_y + total_height + 20),
# # bg_color,
# # -1
# # )
#
# # # 逐行绘制统计文本
# # for i, text in enumerate(stats_text):
# # y_pos = start_y + i * (text_height + 30)
# # temp_img = frame_copy.copy()
# # frame_copy = put_chinese_text(
# # temp_img,
# # text,
# # (start_x, y_pos),
# # color=text_color
# # )
#
# new_data = {
# 'frame_copy': frame_copy,
# 'frame': frame,
# "osd_info": osd_info,
# 'detections': detections,
# "message_point": message_point,
# "cls_count": cls_count,
# "target_point": target_point,
# "model_list_func_id": model_list_func_id,
# "model_func_id": model_func_id,
# # 提取第一个模型的func_id 字段因为现在已经要做整合无法在区分各个提取第一个模型的func_id
# 'timestamp': processed_data.get('timestamp'),
# "detections_list": detections_list,
# "model_para": model_para
# # 'model_para': processed_data.get('model_para', {}) # 确保 model_para 存在
# }
# # 临时代码 rtmp 和侵限逻辑要改
# if invade_state:
# # para_list 中使能了 para_invade_enable才做侵限判断
# if para_invade_enable:
# if not invade_queue.full():
# await invade_queue.put(new_data)
# else:
# if not cv_frame_queue.full():
# await cv_frame_queue.put(new_data)
#
# if not cv_frame_thread_queue.full():
# cv_frame_thread_queue.put(new_data)
#
# time_end = time.time_ns()
# pic_count = pic_count + 1
# if time_end - time_start > 10000000000:
# print(f"writeFrames {pic_count}")
# pic_count = 0
# time_start = time_end
# event_queue.put({ # 用作测试录像
# "timestamp": time.time_ns()
# })
#
# #
# # count_p = count_p + 1
# # cv2.imwrite(f"save_pic/rtmp/test-{count_p}.jpg", frame_copy)
# # video_writer.write(frame_copy)
# # else:
# # frame_copy = frame
#
# # 转换颜色空间
# rgb_frame = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB)
#
# # # # # # 显示帧用于调试(可选)
# # cv2.imshow(f"write_results_to_rtmp-{task_id}", frame_copy)
# # if cv2.waitKey(1) & 0xFF == ord('q'):
# # stop_event.set()
# # break
#
# # 初始化推流容器(如果尚未初始化)
# if output_url and output_url not in stream_containers:
# try:
# # container = av.open(output_url, mode='w', format='flv')
# container = av.open(
# output_url,
# mode='w',
# format='flv',
# options={
# # 添加RTMP连接超时5秒和数据超时10秒
# 'rtmp_connect_timeout': '5000000', # 单位微秒5秒
# 'rtmp_timeout': '10000000', # 单位微秒10秒
# 'stimeout': '5000000' # 底层socket超时
# }
# )
# stream = container.add_stream(codec_name, rate=Config.TARGET_FPS)
# # stream.time_base = f"1/{Config.TARGET_FPS}"
#
# stream.width = frame.shape[1]
#
# stream.height = frame.shape[0]
# stream.pix_fmt = 'yuv420p'
# stream.options = options
#
# stream_containers[output_url] = {
# 'container': container,
# 'stream': stream,
# 'last_frame_time': time.time(),
# 'frame_count': 0,
# 'retry_count': 0
# }
# print(f"✅ 推流初始化成功: {output_url}")
# except Exception as e:
# print(f"❌ 推流初始化失败: {e}")
# if 'container' in locals():
# try:
# container.close()
# except:
# pass
# await asyncio.sleep(1.0)
# continue
#
# # 推流逻辑
# if output_url and output_url in stream_containers:
# try:
# container_info = stream_containers[output_url]
# stream = container_info['stream']
# container = container_info['container']
#
# if rgb_frame.dtype == np.uint8:
# av_frame = av.VideoFrame.from_ndarray(rgb_frame, format='rgb24')
# packets = stream.encode(av_frame) # 这是异步方法
# # print(f"📦 encode 生成 {len(packets)} 个 packet")
#
# if packets:
# for packet in packets:
# try:
# container.mux(packet)
# container_info['last_frame_time'] = time.time()
# container_info['frame_count'] += 1
# except Exception as e:
# logger.warning(f"推流数据包错误: {e}")
# container_info['retry_count'] += 1
# if container_info['retry_count'] > max_retries:
# raise
# else:
# # 编码器仍在初始化,不更新 last_frame_time
# pass
#
# # 每100帧打印一次状态
# # if container_info['frame_count'] % 100 == 0:
# # print(f" 已推送 {container_info['frame_count']} 帧到 {output_url}")
# if 'frame' in locals():
# frame_time = time.time() - frame_start
# print(f"推流帧耗时: {frame_time:.4f}s")
# else:
# print(f"⚠️ 无效帧格式: {rgb_frame.dtype}")
# except Exception as e:
# logger.error(f"❌ 推流错误: {e}")
# # 尝试重新初始化推流
# if output_url in stream_containers:
# try:
# stream_containers[output_url]['container'].close()
# except:
# pass
# del stream_containers[output_url]
# await asyncio.sleep(retry_delay)
# continue
#
#
# except asyncio.TimeoutError:
# # if stop_event.is_set():
# # break
# continue
# except asyncio.CancelledError:
# print("write_results_to_rtmp 收到取消信号")
# raise
# except Exception as e:
# logger.error(f"推流处理异常: {e}", exc_info=True)
# await asyncio.sleep(0.1)
#
# finally:
# # log_perf('write_results_to_rtmp', start_time)
#
# # 清理推流容器
# for url, info in list(stream_containers.items()):
# try:
# if 'container' in info:
# info['container'].close()
# except Exception as e:
# logger.warning(f"关闭推流容器时出错: {e}")
# stream_containers.clear()
#
# # 清理视频写入器
# if video_writer is not None:
# video_writer.release()
#
# print("write_results_to_rtmp读取线程已停止")
async def write_results_to_rtmp_high_speed(task_id: str, output_url: str = None, input_fps: float = None,
list_points: list[list[any]] = None, camera_para: Camera_Para = None,
invade_state: bool = False, cancel_flag: asyncio.Event = None,
cv_frame_thread_queue: Queue = None,
processed_queue: asyncio.Queue = None,
invade_queue: asyncio.Queue = None,
cv_frame_queue: asyncio.Queue = None,
event_queue: Optional[Queue] = None,
stream_containers: Dict[str, Any] = None,
):
"""优化后的FFmpeg推流实现解决画面模糊问题"""
start_time = time.time()
time_start = time.time_ns()
pic_count = 0
total_frames_processed = 0
total_push_time = 0.0
total_process_time = 0.0
# FFmpeg推流参数
ffmpeg_process = None
ffmpeg_input_pipe = None
frame_width, frame_height = None, None
fps = input_fps or Config.TARGET_FPS
frame_interval = 1.0 / fps
# 性能统计
perf_stats = {
'encode_time': 0.0,
'push_time': 0.0,
'queue_wait': 0.0,
'frame_process': 0.0
}
try:
logger.info(f"【推流初始化】开始推流任务 {task_id}, 目标FPS: {fps}, 输出URL: {output_url}")
# 初始化FFmpeg推流进程
if output_url:
try:
logger.info(f"【推流初始化】尝试初始化FFmpeg推流到: {output_url}")
# 先尝试获取一帧来确定分辨率最多重试3次
max_retries = 3
frame_obtained = False
processed_data = None
for retry_count in range(max_retries):
logger.info(f"【推流初始化】尝试获取第一帧 (尝试 {retry_count + 1}/{max_retries})...")
try:
# 等待第一帧数据来确定分辨率
processed_data = await asyncio.wait_for(
processed_queue.get(),
timeout=3.0 # 3秒超时
)
logger.info(f"【推流初始化】第{retry_count + 1}次尝试获取到数据,类型: {type(processed_data)}")
if isinstance(processed_data, dict) and 'frame' in processed_data:
frame_obtained = True
logger.info(f"✅ 【推流初始化】成功获取有效帧数据")
break
else:
logger.warning(f"⚠️ 【推流初始化】获取到无效数据,重新尝试...")
# 如果是无效数据,重新放回队列
if processed_data is not None:
await processed_queue.put(processed_data)
except asyncio.TimeoutError:
logger.warning(f"⏰ 【推流初始化】第{retry_count + 1}次尝试获取帧超时")
if cancel_flag.is_set():
logger.info("【推流初始化】任务被取消,退出")
return
continue
if not frame_obtained or processed_data is None:
logger.error("❌ 【推流初始化】无法获取有效的帧数据,推流任务终止")
return
# 验证帧数据
frame = processed_data['frame']
logger.info(f"【推流初始化】获取帧成功,形状: {frame.shape}, 数据类型: {frame.dtype}")
if frame is None or len(frame.shape) != 3 or frame.shape[2] != 3:
logger.error(f"❌ 【推流初始化】帧格式错误: shape={frame.shape}, 期望(h,w,3)")
# 重新放回队列以供其他消费者使用
await processed_queue.put(processed_data)
return
frame_height, frame_width = frame.shape[:2]
logger.info(f"✅ 【推流初始化】检测到输入分辨率: {frame_width}x{frame_height}")
# 重新放回队列
await processed_queue.put(processed_data)
logger.debug(f"【推流初始化】将帧放回队列")
# 启动FFmpeg进程
logger.info(f"【推流初始化】启动FFmpeg进程...")
try:
ffmpeg_process, ffmpeg_input_pipe = start_ffmpeg_stream(
output_url,
frame_width,
frame_height,
fps
)
logger.info(f"✅ 【推流初始化】FFmpeg进程启动成功进程ID: {ffmpeg_process.pid}")
except Exception as e:
logger.error(f"❌ 【推流初始化】FFmpeg进程启动失败: {e}", exc_info=True)
raise
# 检查进程是否正常
if ffmpeg_process is None or ffmpeg_input_pipe is None:
logger.error("❌ 【推流初始化】FFmpeg进程或管道为None")
raise ValueError("FFmpeg进程或管道初始化失败")
if ffmpeg_process.poll() is not None:
logger.error(f"❌ 【推流初始化】FFmpeg进程已退出返回码: {ffmpeg_process.returncode}")
# 尝试获取错误输出
if ffmpeg_process.stderr:
try:
stderr_output = ffmpeg_process.stderr.read()
logger.error(f"❌ 【推流初始化】FFmpeg错误输出: {stderr_output.decode('utf-8', errors='ignore')}")
except:
pass
raise ValueError("FFmpeg进程启动后立即退出")
stream_containers[output_url] = {
'process': ffmpeg_process,
'pipe': ffmpeg_input_pipe,
'frame_count': 0,
'last_frame_time': time.time(),
'retry_count': 0,
'width': frame_width,
'height': frame_height,
'fps': fps
}
logger.info(f"✅ 【推流初始化】FFmpeg推流初始化成功: {output_url}")
except Exception as e:
logger.error(f"❌ 【推流初始化】FFmpeg推流初始化失败: {e}", exc_info=True)
# 清理已创建的资源
await cleanup_ffmpeg(ffmpeg_process, ffmpeg_input_pipe, output_url, stream_containers)
return
logger.info(f"【推流启动】开始主推流循环目标FPS: {fps}")
# 添加一个初始化完成标志
initialization_complete = True
consecutive_timeouts = 0
max_consecutive_timeouts = 10
while not cancel_flag.is_set():
frame_start = time.time()
process_start = time.perf_counter()
try:
# 获取处理后的数据
get_start = time.perf_counter()
try:
# 如果推流初始化完成,但队列为空,可以适当等待
timeout_value = 0.5 if initialization_complete else 0.1
processed_data = await asyncio.wait_for(
processed_queue.get(),
timeout=timeout_value
)
consecutive_timeouts = 0 # 重置超时计数
except asyncio.TimeoutError:
consecutive_timeouts += 1
# 如果连续超时次数过多,检查系统状态
if consecutive_timeouts >= max_consecutive_timeouts:
logger.warning(f"⚠️ 【队列状态】连续{consecutive_timeouts}次从队列获取数据超时")
# 检查是否有其他消费者在工作
if processed_queue.qsize() == 0:
logger.warning("⚠️ 【队列状态】处理队列为空,检查上游处理")
# 重置计数器避免过多日志
consecutive_timeouts = 0
# 检查FFmpeg进程状态
if output_url and ffmpeg_process and ffmpeg_process.poll() is not None:
logger.error(f"🔴 【进程检查】FFmpeg进程已退出返回码: {ffmpeg_process.returncode}")
# 尝试自动恢复
if output_url in stream_containers:
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
continue
get_time = time.perf_counter() - get_start
perf_stats['queue_wait'] += get_time
if not isinstance(processed_data, dict) or 'frame' not in processed_data:
logger.warning(f"⚠️ 【帧处理】无效的帧数据: {type(processed_data)}")
continue
frame = processed_data['frame']
# 检查帧的有效性
if frame is None or len(frame.shape) != 3 or frame.shape[2] != 3:
logger.warning(f"⚠️ 【帧处理】无效的帧: shape={frame.shape if frame is not None else 'None'}")
continue
# 检查分辨率是否一致
current_height, current_width = frame.shape[:2]
if frame_width is None or frame_height is None:
frame_width, frame_height = current_width, current_height
logger.info(f"📏 【分辨率设置】设置初始分辨率: {frame_width}x{frame_height}")
if (current_width, current_height) != (frame_width, frame_height):
logger.warning(f"⚠️ 【分辨率警告】分辨率变化: 从 {frame_width}x{frame_height} 变为 {current_width}x{current_height}")
frame_width, frame_height = current_width, current_height
# 如果分辨率变化可能需要重新初始化FFmpeg
if output_url and output_url in stream_containers:
container_info = stream_containers[output_url]
if container_info['width'] != frame_width or container_info['height'] != frame_height:
logger.info(f"🔄 【分辨率变更】检测到分辨率变化重新初始化FFmpeg")
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
# 推流逻辑
if output_url and ffmpeg_input_pipe:
push_start = time.perf_counter()
try:
# 检查FFmpeg进程状态
if ffmpeg_process.poll() is not None:
logger.error(f"🔴 【推流错误】FFmpeg进程已退出返回码: {ffmpeg_process.returncode}")
# 尝试自动恢复
if output_url in stream_containers:
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
continue
# 确保帧是连续的内存布局
if not frame.flags['C_CONTIGUOUS']:
frame = np.ascontiguousarray(frame)
# 转换为RGB格式
convert_start = time.perf_counter()
try:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
except Exception as e:
logger.error(f"🎨 【颜色转换】BGR转RGB失败: {e}")
continue
convert_time = time.perf_counter() - convert_start
perf_stats['encode_time'] += convert_time
# 写入到FFmpeg的stdin
write_start = time.perf_counter()
try:
frame_bytes = rgb_frame.tobytes()
# 写入帧数据
ffmpeg_input_pipe.write(frame_bytes)
ffmpeg_input_pipe.flush()
write_time = time.perf_counter() - write_start
perf_stats['push_time'] += write_time
# 更新统计
stream_containers[output_url]['frame_count'] += 1
stream_containers[output_url]['last_frame_time'] = time.time()
if stream_containers[output_url]['frame_count'] % 100 == 0:
logger.info(f"📤 【推流统计】{output_url} 已推流 {stream_containers[output_url]['frame_count']}")
except BrokenPipeError as e:
logger.error(f"🔌 【写入错误】管道损坏: {e}")
# 尝试自动恢复
if output_url in stream_containers:
await attempt_ffmpeg_recovery(output_url, stream_containers, frame_width, frame_height, fps)
continue
except Exception as e:
logger.error(f"❌ 【写入错误】写入帧失败: {e}", exc_info=True)
continue
push_time = time.perf_counter() - push_start
total_push_time += push_time
except Exception as e:
logger.error(f"⚠️ 【推流异常】处理异常: {e}")
continue
# 统计信息
pic_count += 1
total_frames_processed += 1
process_time = time.perf_counter() - process_start
total_process_time += process_time
# 每10秒输出统计信息
if time.time_ns() - time_start > 10000000000: # 10秒统计一次
elapsed_seconds = (time.time_ns() - time_start) / 1e9
actual_fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
avg_process_time = total_process_time / total_frames_processed if total_frames_processed > 0 else 0
avg_push_time = total_push_time / total_frames_processed if total_frames_processed > 0 else 0
# 队列状态
queue_size = processed_queue.qsize()
logger.info(f"📊 【性能统计】任务: {task_id[:8]}... | "
f"帧数: {total_frames_processed} | "
f"FPS: {actual_fps:.2f} | "
f"队列: {queue_size} | "
f"处理: {avg_process_time*1000:.1f}ms | "
f"推流: {avg_push_time*1000:.1f}ms")
# 重置统计
pic_count = 0
total_process_time = 0.0
total_push_time = 0.0
time_start = time.time_ns()
# 帧率控制
frame_time = time.time() - frame_start
sleep_time = frame_interval - frame_time
if sleep_time > 0:
await asyncio.sleep(sleep_time)
else:
if frame_time > frame_interval * 1.5: # 只记录严重的延迟
logger.debug(f"🐢 【帧率警告】处理时间 {frame_time*1000:.1f}ms 超过帧间隔 {frame_interval*1000:.1f}ms")
except asyncio.CancelledError:
logger.info(f"⏹️ 【推流任务】{task_id} 收到取消信号")
raise
except Exception as e:
logger.error(f"❌ 【主循环异常】推流处理异常: {e}", exc_info=True)
await asyncio.sleep(0.1) # 避免CPU过度占用
except asyncio.CancelledError:
logger.info(f"⏹️ 【任务取消】推流任务 {task_id} 被取消")
except Exception as e:
logger.error(f"❌ 【任务异常】推流任务 {task_id} 异常退出: {e}", exc_info=True)
finally:
# 清理资源
logger.info(f"🧹 【清理资源】开始清理推流任务: {task_id}")
if output_url in stream_containers:
logger.info(f"🧹 【清理资源】清理容器中的资源: {output_url}")
if ffmpeg_process or ffmpeg_input_pipe:
await cleanup_ffmpeg(ffmpeg_process, ffmpeg_input_pipe, output_url, stream_containers)
elapsed_time = time.time() - start_time
if elapsed_time > 0:
avg_fps = total_frames_processed / elapsed_time
status_msg = f"✅ 【任务完成】推流任务 {task_id[:8]}... "
if total_frames_processed > 0:
status_msg += f"成功推流 {total_frames_processed}平均FPS: {avg_fps:.2f}"
else:
status_msg += f"未处理任何帧"
logger.info(status_msg)
async def attempt_ffmpeg_recovery(output_url, stream_containers, width, height, fps, max_retries=3):
"""尝试恢复FFmpeg推流"""
if output_url not in stream_containers:
logger.error(f"❌ 【恢复失败】容器中没有找到: {output_url}")
return False
container = stream_containers[output_url]
# 如果重试次数过多,不再尝试
if container.get('retry_count', 0) >= max_retries:
logger.error(f"❌ 【恢复失败】已达到最大重试次数: {max_retries}")
return False
for retry in range(max_retries):
try:
logger.info(f"🔄 【重连尝试】第{retry + 1}次尝试重新连接FFmpeg...")
# 清理旧资源
old_process = container.get('process')
old_pipe = container.get('pipe')
if old_pipe:
try:
old_pipe.close()
except:
pass
if old_process and old_process.poll() is None:
try:
old_process.terminate()
old_process.wait(timeout=1)
except:
pass
# 重新初始化FFmpeg
ffmpeg_process, ffmpeg_input_pipe = start_ffmpeg_stream(
output_url,
width,
height,
fps
)
# 更新容器
stream_containers[output_url] = {
'process': ffmpeg_process,
'pipe': ffmpeg_input_pipe,
'frame_count': 0,
'last_frame_time': time.time(),
'retry_count': container.get('retry_count', 0) + 1,
'width': width,
'height': height,
'fps': fps
}
logger.info(f"✅ 【重连成功】FFmpeg重新初始化成功重试次数: {stream_containers[output_url]['retry_count']}")
return True
except Exception as e:
logger.error(f"❌ 【重连失败】第{retry + 1}次尝试失败: {e}")
await asyncio.sleep(1) # 等待1秒后重试
logger.error(f"❌ 【重连失败】所有{max_retries}次尝试都失败了")
return False
def start_ffmpeg_stream(output_url: str, width: int, height: int, fps: int = 30):
"""启动FFmpeg推流进程"""
logger.info(f"🚀 【启动FFmpeg】开始启动FFmpeg: {width}x{height}@{fps}fps -> {output_url}")
# FFmpeg推流参数 - 针对RTMP优化
ffmpeg_cmd = [
'ffmpeg',
'-re', # 按照实际帧率读取输入
'-y', # 覆盖输出文件
'-f', 'rawvideo', # 输入格式
'-vcodec', 'rawvideo', # 输入编码
'-pix_fmt', 'rgb24', # 输入像素格式
'-s', f'{width}x{height}', # 分辨率
'-r', str(fps), # 帧率
'-i', '-', # 从标准输入读取
'-c:v', 'libx264', # 视频编码
'-preset', 'ultrafast', # 编码速度
'-tune', 'zerolatency', # 零延迟
'-pix_fmt', 'yuv420p', # 输出像素格式
'-b:v', '2000k', # 视频比特率
'-maxrate', '2500k', # 最大比特率
'-bufsize', '1000k', # 缓冲区大小
'-f', 'flv', # 输出格式
'-flvflags', 'no_duration_filesize', # FLV标志
output_url
]
logger.info(f"📋 【FFmpeg命令】执行命令: {' '.join(ffmpeg_cmd)}")
try:
# 启动FFmpeg进程
logger.info("⏳ 【进程启动】启动FFmpeg子进程...")
process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
bufsize=width * height * 3, # 缓冲区大小为1帧
text=False
)
# 检查进程是否启动成功
import time as time_module
time_module.sleep(0.1) # 给进程一点时间启动
if process.poll() is not None:
# 进程已退出,读取错误信息
stderr_output = process.stderr.read()
if stderr_output:
error_msg = stderr_output.decode('utf-8', errors='ignore')
logger.error(f"❌ 【FFmpeg启动失败】进程立即退出错误: {error_msg}")
else:
logger.error("❌ 【FFmpeg启动失败】进程立即退出无错误输出")
raise RuntimeError(f"FFmpeg进程启动失败返回码: {process.returncode}")
logger.info(f"✅ 【FFmpeg启动】进程启动成功PID: {process.pid}")
# 启动一个线程来读取stderr避免阻塞
def read_stderr():
logger.info("👂 【读取日志】开始读取FFmpeg日志...")
while True:
try:
line = process.stderr.readline()
if not line:
break
line_str = line.decode('utf-8', errors='ignore').strip()
# 根据日志级别输出
if 'error' in line_str.lower() or 'fail' in line_str.lower():
logger.error(f"🔴 【FFmpeg错误】{line_str}")
elif 'warning' in line_str.lower():
logger.warning(f"🟡 【FFmpeg警告】{line_str}")
elif 'frame=' in line_str and 'fps=' in line_str:
# 帧率信息,可调低日志级别
logger.debug(f"📈 【FFmpeg状态】{line_str}")
elif line_str: # 其他信息
logger.debug(f" 【FFmpeg信息】{line_str}")
except Exception as e:
logger.error(f"❌ 【读取日志错误】读取FFmpeg日志失败: {e}")
break
import threading
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
logger.info("✅ 【日志线程】FFmpeg日志线程启动")
return process, process.stdin
except FileNotFoundError as e:
logger.error(f"❌ 【FFmpeg未找到】请确保FFmpeg已安装并在PATH中: {e}")
raise
except Exception as e:
logger.error(f"❌ 【FFmpeg启动异常】启动FFmpeg进程失败: {e}", exc_info=True)
raise
async def cleanup_ffmpeg(process, pipe, output_url, stream_containers):
"""清理FFmpeg资源"""
logger.info(f"🧹 【清理】开始清理FFmpeg资源: {output_url}")
try:
if pipe:
logger.debug("🧹 【清理】关闭管道...")
try:
pipe.close()
logger.debug("✅ 【清理】管道关闭成功")
except Exception as e:
logger.error(f"❌ 【清理】关闭管道错误: {e}")
except Exception as e:
logger.error(f"❌ 【清理】管道清理异常: {e}")
try:
if process:
logger.debug(f"🧹 【清理】终止FFmpeg进程 {process.pid}...")
try:
# 发送SIGTERM信号
process.terminate()
try:
# 等待进程结束
await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, process.wait), timeout=5)
logger.debug(f"✅ 【清理】FFmpeg进程 {process.pid} 正常终止")
except (asyncio.TimeoutError, subprocess.TimeoutExpired):
# 如果超时,强制杀死
logger.warning(f"⚠️ 【清理】FFmpeg进程 {process.pid} 未正常退出,强制杀死")
process.kill()
process.wait()
logger.debug(f"✅ 【清理】FFmpeg进程 {process.pid} 强制杀死完成")
except Exception as e:
logger.error(f"❌ 【清理】终止FFmpeg进程错误: {e}")
except Exception as e:
logger.error(f"❌ 【清理】进程清理异常: {e}")
# 从容器中移除
if output_url and output_url in stream_containers:
logger.debug(f"🧹 【清理】从容器中移除: {output_url}")
del stream_containers[output_url]
logger.debug(f"✅ 【清理】容器清理完成: {output_url}")
logger.info(f"✅ 【清理完成】FFmpeg资源清理完成: {output_url}")
async def rtmp_process_async_high_speed(group_id: str, video_url: str, task_id: str,
model_configs: List[Dict],
mqtt_pub_ip: str, mqtt_pub_port: int,
mqtt_pub_topic: str,
mqtt_sub_ip: str, mqtt_sub_port: int,
mqtt_sub_topic: str,
output_rtmp_url: str,
invade_enable: bool, invade_file: str,
camera_para_url: str,
cv_async_frame_queue: Queue, event_queue: Queue, invade_queue: Queue,
stop_event: Event, performance_counter, timestamp_frame_queue):
"""进程A的异步主逻辑 - 高速版本"""
logger.info(f"[ProcessA] 启动高速处理引擎")
logger.info(f"[ProcessA] 目标FPS: 20+")
# 初始化异步队列
cancel_flag = asyncio.Event()
frame_queue = asyncio.Queue(maxsize=Config.FRAME_QUEUE_SIZE)
processed_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE * 2) # 增大队列
cv_frame_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE)
event_async_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE)
stream_containers: Dict[str, Any] = {}
loop = asyncio.get_running_loop()
try:
# 加载侵限区域数据
list_points = []
camera_para = None
if invade_file and camera_para_url:
try:
camera_file_path = downFile(camera_para_url)
camera_para = read_camera_params(camera_file_path)
except Exception as e:
logger.warning(f"[ProcessA] 加载相机参数失败: {e}")
# 初始化模型
# detector = MultiYoloTrtDetectorTrackId_TRT8(model_configs)
detector = MultiYoloTrtDetectorTrackId_TRT10_YOLO11(model_configs)
logger.info(f"[ProcessA] 模型加载完成")
# 创建任务
tasks = []
process_task = None
write_task = None
# 初始化MQTT设备如果需要
device = None
if mqtt_sub_ip and mqtt_sub_port and mqtt_sub_topic:
try:
device = MQTTDevice(host=mqtt_sub_ip, port=mqtt_sub_port,
client_id=f"{task_id}_sub")
device.subscribe(topic=mqtt_sub_topic, callback=empty_osd_callback)
logger.info(f"[ProcessA] MQTT订阅已初始化: {mqtt_sub_topic}")
except Exception as e:
logger.warning(f"[ProcessA] MQTT初始化失败: {e}")
try:
# 创建线程池
read_rtmp_executor = ThreadPoolExecutor(max_workers=Config.READ_RTMP_WORKERS,
thread_name_prefix=f"{group_id}_read_rtmp")
# 任务1: 拉流
read_task = asyncio.create_task(
read_rtmp_frames(
loop=loop,
read_rtmp_frames_executor=read_rtmp_executor,
video_url=video_url,
device=device,
topic_camera_osd=mqtt_sub_topic,
method_camera_osd="drc_camera_osd_info_push",
topic_osd_info=mqtt_sub_topic,
method_osd_info="osd_info_push",
cancel_flag=cancel_flag,
frame_queue=frame_queue,
timestamp_frame_queue=timestamp_frame_queue
),
name=f"{group_id}_read_rtmp"
)
tasks.append(read_task)
# 高速处理任务
process_frame_executor = ThreadPoolExecutor(max_workers=Config.PROCESS_FRAME_WORKERS)
process_task = asyncio.create_task(
high_speed_process_frames(detector, cancel_flag, frame_queue, processed_queue),
name=f"{group_id}_process_frames"
)
tasks.append(process_task)
# 高速推流任务
invade_state = bool(list_points) and invade_enable
write_frame_executor = ThreadPoolExecutor(max_workers=Config.WRITE_FRAME_WORKERS)
write_task = asyncio.create_task(
write_results_to_rtmp_high_speed(
task_id,
output_rtmp_url,
None,
list_points,
camera_para,
invade_state,
cancel_flag,
cv_async_frame_queue,
processed_queue,
invade_queue,
cv_frame_queue,
event_queue,
stream_containers
),
name=f"{group_id}_write_rtmp"
)
tasks.append(write_task)
# 等待停止事件
while not (stop_event.is_set() or cancel_flag.is_set()):
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info(f"[ProcessA] 任务被取消")
except Exception as e:
logger.error(f"[ProcessA] 任务异常: {e}")
raise
finally:
# 停止所有任务
cancel_flag.set()
for task in tasks:
if task and not task.done():
task.cancel()
# 等待任务结束
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"[ProcessA] 异常: {e}")
raise
finally:
# 清理资源
if 'detector' in locals():
detector.destroy()
logger.info(f"[ProcessA] 高速处理结束")
def rtmp_process_main(group_id: str, video_url: str, task_id: str,
model_configs: List[Dict],
mqtt_pub_ip: str, mqtt_pub_port: int,
mqtt_pub_topic: str,
mqtt_sub_ip: str, mqtt_sub_port: int,
mqtt_sub_topic: str,
output_rtmp_url: str,
invade_enable: bool, invade_file: str,
camera_para_url: str,
group_queues: Dict):
"""进程A的主函数 - 高速版本"""
pid = os.getpid()
logger.info(f"[ProcessA] 启动 (PID:{pid})任务ID: {task_id}")
# 检查CPU状态
check_cpu_affinity()
try:
asyncio.run(rtmp_process_async_high_speed(
group_id, video_url, task_id, model_configs,
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
output_rtmp_url, invade_enable, invade_file,
camera_para_url,
group_queues.get('cv_frame_queue'),
group_queues.get('event_queue'),
group_queues.get('invade_queue'),
group_queues.get('stop_event'),
group_queues.get('performance_counter'),
group_queues.get('timestamp_frame_queue')
))
except Exception as e:
logger.error(f"[ProcessA] 异常: {e}")
error_queue = group_queues.get('error_queue')
if error_queue:
error_queue.put({
'group_id': group_id,
'process': 'ProcessA',
'error': str(e),
'time': datetime.now()
})
async def cut_evnt_video_publish_test():
print("cut_evnt_video_publish_test")
#
# async def cut_evnt_video_publish(
# task_id: str,
# mqtt: MQTTService,
# mqtt_topic: str,
# cancel_flag: asyncio.Event,
# event_queue: Queue, # 修改为 multiprocessing.Queue
# timestamp_frame_queue: Queue # 修改为 multiprocessing.Queue
# ):
# """事件视频切割与发布"""
# print("cut_evnt_video_publish 启动")
# loop = asyncio.get_running_loop()
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
# time_start = time.time_ns()
#
# while not cancel_flag.is_set():
# try:
# # 使用 Queue.get() 的阻塞版本(需要处理空队列情况)
# if event_queue.empty():
# await asyncio.sleep(0.1) # 短暂等待避免CPU占用
# continue
#
# # 从事件队列中获取事件(使用同步方式,因为 multiprocessing.Queue 不支持 async
# try:
# # 以下代码需要适配 multiprocessing.Queue 的同步获取方式
# # 由于 multiprocessing.Queue 没有 async 支持,需要改用同步获取
# # 这里假设 event_queue 是 multiprocessing.Queue
# # 需要通过 loop.run_in_executor 来同步获取
# def get_event():
# try:
# return event_queue.get_nowait()
# except queue.Empty:
# return None
#
# event = await loop.run_in_executor(None, get_event)
# if event is None:
# await asyncio.sleep(0.1)
# continue
#
# except Exception as e:
# print(f"获取事件失败: {e}")
# await asyncio.sleep(0.1)
# continue
#
# # 检查事件有效性
# if not isinstance(event, dict) or "timestamp" not in event:
# print(f"无效事件,跳过")
# continue
#
# timestamp = event.get("timestamp")
# if timestamp is None:
# print("⚠️ 警告:事件中缺少 timestamp")
# continue
#
# # 收集帧数据(同步获取)
# temp_list = []
# count1=0
# while not timestamp_frame_queue.empty():
# try:
# def get_frame():
# try:
#
# print(f"timestamp_frame_queue.qsize() {timestamp_frame_queue.qsize()} {count1}")
# return timestamp_frame_queue.get_nowait()
# except queue.Empty:
# return None
#
# item = await loop.run_in_executor(None, get_frame)
# if item is None:
# break
# temp_list.append(item)
# except Exception as e:
# print(f"获取帧失败: {e}")
# break
#
# if len(temp_list) < 1:
# continue
#
# # 取第一个帧的时间戳作为基准
# timestamp = temp_list[0]["timestamp"]
# frames = [item["frame"] for item in temp_list]
#
# # 生成视频
# video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
# if video_bytes is None:
# print(f"⚠ ️警告:视频生成失败")
# continue
#
# # 上传与发布
# async def upload_and_publish():
# def upload_minio():
# return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
#
# try:
# minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
# message = {
# "task_id": task_id,
# "time": str(datetime.now()),
# "detection_id": timestamp,
# "minio": {
# "minio_path": minio_path,
# "minio_origin_path": minio_path,
# "file_type": file_type
# },
# "box_detail": [],
# "osd_location": {},
# "des_location": []
# }
# print(f"成功上传视频: {message}")
# await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
# except Exception as e:
# print(f"上传失败: {e}")
#
# await upload_and_publish()
#
# except Exception as e:
# print(f"[cut循环] 处理事件出错: {e}")
# await asyncio.sleep(0.1)
# except asyncio.CancelledError:
# print("cut_evnt_video_publish 收到取消信号")
# raise
#
# print("cut_evnt_video_publish 线程已停止")
async def cut_evnt_video_publish(
task_id: str,
mqtt: MQTTService,
mqtt_topic: str,
cancel_flag: asyncio.Event,
event_queue: Queue,
timestamp_frame_queue: Queue):
"""事件视频切割与发布"""
print("cut_evnt_video_publish 启动")
loop = asyncio.get_running_loop()
upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
time_start = time.time_ns()
def extract_all_frames_and_clear():
"""原子性地提取队列中所有帧到列表并清空队列"""
frames = []
try:
start_time = time.time()
frame_count = 0
# 先获取队列大小
qsize = timestamp_frame_queue.qsize()
print(f"准备从队列获取 {qsize} 帧...")
# 批量获取
temp_frames = []
while frame_count < qsize:
try:
item = timestamp_frame_queue.get_nowait()
temp_frames.append(item)
frame_count += 1
except queue.Empty:
break
# 控制获取速度避免CPU占用过高
if frame_count % 100 == 0:
time.sleep(0.001)
print(f"实际获取到 {len(temp_frames)} 帧,耗时 {time.time() - start_time:.3f}")
# 清空队列(确保所有元素都被取出)
while True:
try:
timestamp_frame_queue.get_nowait()
except queue.Empty:
break
return temp_frames
except Exception as e:
print(f"提取帧失败: {e}")
return []
while not cancel_flag.is_set():
try:
if event_queue.empty():
await asyncio.sleep(0.1)
continue
def get_event():
try:
return event_queue.get_nowait()
except queue.Empty:
return None
event = await loop.run_in_executor(None, get_event)
if event is None:
await asyncio.sleep(0.1)
continue
if not isinstance(event, dict) or "timestamp" not in event:
print(f"无效事件,跳过")
continue
timestamp = event.get("timestamp")
if timestamp is None:
print("⚠️ 警告:事件中缺少 timestamp")
continue
temp_list = extract_all_frames_and_clear()
if len(temp_list) < 1:
print("未获取到帧数据,跳过")
continue
print(f"成功获取 {len(temp_list)} 帧用于视频生成")
timestamp = temp_list[0]["timestamp"]
frames = [item["frame"] for item in temp_list]
video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
if video_bytes is None:
print(f"⚠ ️警告:视频生成失败")
continue
async def upload_and_publish():
def upload_minio():
return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
try:
minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
message = {
"task_id": task_id,
"time": str(datetime.now()),
"detection_id": timestamp,
"minio": {
"minio_path": minio_path,
"minio_origin_path": minio_path,
"file_type": file_type
},
"box_detail": [],
"osd_location": {},
"des_location": []
}
print(f"成功上传视频: {message}")
await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
except Exception as e:
print(f"上传失败: {e}")
await upload_and_publish()
except Exception as e:
print(f"[cut循环] 处理事件出错: {e}")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("cut_evnt_video_publish 收到取消信号")
raise
print("cut_evnt_video_publish 线程已停止")
# async def cut_evnt_video_publish(task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event,
# # event_queue: asyncio.Queue = None,
# event_queue: Queue = None,
# timestamp_frame_queue: Queue = None):
# print("cut_evnt_video_publishcut_evnt_video_publish")
# loop = asyncio.get_running_loop()
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
# time_start = time.time_ns()
# while not cancel_flag.is_set():
# print("cut_evnt_video_publishcut_evnt_video_publish111111")
#
# try:
# # 从事件队列中获取事件(带超时,避免阻塞)
# try:
# event = await asyncio.wait_for(event_queue.get(), timeout=1.0)
# except asyncio.TimeoutError:
# continue # 队列为空,继续下一轮循环
# #
# # if not isinstance(event, dict) or "timestamp" not in event:
# # print(f"[cut循环] 无效事件,跳过")
# # continue
# #
# timestamp = event.get("timestamp")
# if timestamp is None:
# print("⚠️ 警告:事件中缺少 timestamp")
# continue
# # 以下是测试代码,用作录像测试
# # 基于frame_len 控制生成视频长度
# # matched_items = timestamp_frame_queue.query_by_timestamp(timestamp, 150)
# # print(f"[cut循环] 匹配到 {len(matched_items)} 个帧")
# # if len(matched_items) == 0:
# # continue
#
# time_end = time.time_ns()
# # time.sleep(3)
# print(f"cut_evnt_video_publish {timestamp_frame_queue.qsize()}")
# # if time_end - time_start < 5000000000:
# # continue
# time_start = time_end
# temp_list = []
# while not timestamp_frame_queue.empty():
# try:
# item = timestamp_frame_queue.get_nowait() # 非阻塞获取
# temp_list.append(item)
# except: # 如果队列为空或出错,跳过
# break
# if len(temp_list) <1:
# continue
#
# timestamp=temp_list[0]["timestamp"]
#
# frames = [item["frame"] for item in temp_list]
# if not frames: # 显式检查帧列表是否为空
# print("⚠️ 警告:匹配到的帧列表为空")
# continue
#
# video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4")
# if video_bytes is None: # 检查返回值
# print(f"⚠ 警告视频生成失败返回None {len(frames)}")
# continue
#
# async def upload_and_publish():
# def upload_minio():
# return upload_video_buff_from_buffer(video_bytes, video_format="mp4")
#
# try:
# minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio)
# # message = {"detection_id": timestamp, "minio_path": minio_path}
# message = {
# "task_id": task_id,
# "time": str(datetime.now()),
# "detection_id": timestamp,
# "minio": {"minio_path": minio_path,
# "minio_origin_path": minio_path,
# "file_type": file_type},
# "box_detail": [{
#
# }],
# "osd_location": {
# },
# "des_location": []
# }
# print(f"成功上传视频: {message}")
# await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False))
# except Exception as e:
# print(f"上传失败: {e}")
#
# asyncio.create_task(upload_and_publish())
#
# except Exception as e:
# print(f"[cut循环] 处理事件出错2: {e}")
# finally:
# # if event is not None:
# # event_queue.task_done()
# # print(f"[cut循环] 确认消费,剩余未完成任务: {event_queue._unfinished_tasks}")
#
# if event_queue.qsize() > 0 and event_queue._unfinished_tasks == 0:
# try:
# event_queue.put_nowait(None)
# dummy = await event_queue.get()
# event_queue.task_done()
# except:
# pass
def get_local_drc_message():
global local_drc_message
if local_drc_message is not None:
mess = local_drc_message
local_drc_message = None
return mess
return None
async def send_frame_to_s3_mq(loop, upload_executor, task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event,
cv_frame_queue: asyncio.Queue,
event_queue: asyncio.Queue = None,
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1):
global stats
start_time = time.time()
print("send_frame_to_s3_mq")
# executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
track_filter = TrackIDEventFilter(max_inactive_time=10)
reported_track_ids = defaultdict(float)
count_pic = 0
report_interval = 8
# loop = asyncio.get_running_loop()
local_func_cache = {
"func_100000": None,
"func_100004": None, # 存储缓存缓存人员track_id
"func_100006": None # 存储缓存缓存车辆track_id
}
para = {
"category": 3
}
target_location_back = [] # 本地缓存,用作位置重复计算
current_time_second = int(time.time())
repeat_time_count = 0
while not cancel_flag.is_set():
upload_start = time.time()
try:
# 检查队列长度,避免堆积
if cv_frame_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2:
print(f"警告cv_frame_queue 积压(当前长度={cv_frame_queue.qsize()}),清空队列")
while not cv_frame_queue.empty():
await cv_frame_queue.get()
print("队列已清空,等待新数据...")
await asyncio.sleep(0.1)
continue
# 获取队列数据
try:
cv_frame = await asyncio.wait_for(cv_frame_queue.get(), timeout=0.05)
# 检查数据类型
if not isinstance(cv_frame, dict):
print(f"⚠️ 警告cv_frame 不是字典,而是 {type(cv_frame)}")
continue
except asyncio.TimeoutError:
continue
if repeat_time > 0: # 基于时间的重复计算,是否使能
read_cv_time_second = int(time.time())
# print(f"read_cv_time_secondread_cv_time_second {read_cv_time_second}")
if read_cv_time_second - current_time_second < repeat_time and repeat_time_count > 0:
# print("触发事件去重")
continue
else:
print("没有触发事件去重")
repeat_time_count += 1 # 防止丢失第一个目标
current_time_second = read_cv_time_second
# 准备数据
frame_copy = cv_frame['frame_copy']
frame = cv_frame['frame']
target_point = cv_frame['target_point']
detections = cv_frame['detections']
detections_list = cv_frame['detections_list']
model_para_list = cv_frame.get('model_para', {}) # 默认空字典
timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典
air_alti = cv_frame['osd_info']
img_height, img_width = frame.shape[:2]
gimbal_yaw = air_alti.gimbal_yaw
gimbal_pitch = air_alti.gimbal_pitch
gimbal_roll = air_alti.gimbal_roll
height = air_alti.height
cam_longitude = air_alti.longitude
cam_latitude = air_alti.latitude
# 初始化默认值
frame11 = frame_copy # 默认使用原始帧
box_detail = [] # 默认空列表
current_time = time.time()
h = device_height
us = []
vs = []
heights = []
should_report = True
print(f"target_pointtarget_point {len(target_point)}")
count_item = 0
des_location_result = []
for item in target_point:
# # 跳过无效的track_id
# 检查是否应该上报该track_id
u = item["u"]
v = item["v"]
cls_id = item["cls_id"]
track_id = item["track_id"]
new_track_id = item["new_track_id"]
# should_report = True
# # 如果这个track_id已经上报过检查是否超过上报间隔
# if new_track_id in reported_track_ids:
# last_report_time = reported_track_ids[new_track_id]
# if current_time - last_report_time < report_interval:
# print(f"基于track_id触发去重事件{new_track_id}")
# should_report = False
# print(f"new_track_idnew_track_id {new_track_id} {track_id}")
# if should_report and track_filter.should_report(new_track_id):
# should_report = True
print(f"方法 send_frame——to {count_item}")
count_item += 1
if not track_filter.should_report(track_id):
should_report = False
break
# if track_id < 0: # 适配MultiYOLODetector类该类不支持追踪默认track_id为-1
# should_report = True
# 如果使用TrackIDEventFilter判断需要上报
if should_report:
us.append(u)
vs.append(v)
heights.append(h)
# location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
# height, cam_longitude, cam_latitude, img_width, img_height,
# heights)
#
# if not location_results:
# print("location_results is null")
# continue
if len(us) > 0:
print("进行侵限计算")
location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
height, cam_longitude, cam_latitude, img_width, img_height,
heights)
if not location_results:
continue
if location_results:
des1 = location_results[0]
des1_longitude = des1[0]
des1_latitude = des1[1]
des1_height = des1[2]
des_location_result.append({"longitude": des1_longitude,
"latitude": des1_latitude})
repeat_state = False
show_des = 0
str_loca = ""
if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重
if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息
des1_back = target_location_back[0]
des1_back_longitude = des1_back[0]
des1_back_latitude = des1_back[1]
des1_back_height = des1_back[2]
des1 = location_results[0]
des1_longitude = des1[0]
des1_latitude = des1[1]
des1_height = des1[2]
des_latitude = []
str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}"
des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude)
show_des = des
if des < repeat_dis:
print(f"触发基于坐标判断重复,坐标距离:{des}")
repeat_state = True
else:
target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存
else:
target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息
if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑
for idx, model_para in enumerate(model_para_list):
if not isinstance(detections_list, list) or idx >= len(detections_list):
continue
detections = detections_list[idx] # 正确获取当前模型的检测结果
chinese_label = model_para.get("model_chinese_labe", {})
model_cls = model_para.get("model_cls_index", {})
list_func_id = model_para.get("model_list_func_id", -11)
func_id = model_para.get("func_id", [])
# 获取DRC消息同步操作放到线程池
local_drc_message = await loop.run_in_executor(upload_executor, get_local_drc_message)
if detections is None or len(detections.boxes) < 1:
continue
try:
# 图像处理和结果计算
frame11, box_detail1 = await loop.run_in_executor(
upload_executor,
cal_tricker_results,
frame_copy, detections, None,
func_id, local_func_cache, para, model_cls, chinese_label, list_func_id
)
print(f"cal_tricker_results")
print(f"func_id {func_id}")
print(f"local_func_cache {local_func_cache}")
print(f"para {para}")
print(f"model_cls {model_cls}")
print(f"chinese_label {chinese_label}")
print(f"list_func_id {list_func_id}")
print(f"box_detail1 {len(box_detail1)}")
except Exception as e:
print(f"处理帧时出错: {str(e)}")
import traceback
traceback.print_exc()
continue
count_pic = count_pic + 1
# 图像编码
def encode_frame():
success, buffer = cv2.imencode(".jpg", frame11)
return buffer.tobytes() if success else None
def encode_origin_frame():
success, buffer = cv2.imencode(".jpg", frame)
return buffer.tobytes() if success else None
buffer_bytes = await loop.run_in_executor(upload_executor, encode_frame)
buffer_origin_bytes = await loop.run_in_executor(upload_executor, encode_origin_frame)
if not buffer_bytes:
continue
# 并行处理上传和MQTT发布
async def upload_and_publish():
# 上传到MinIO
def upload_minio():
minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None)
minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None)
return minio_path, minio_origin_path, file_type
minio_path, minio_origin_path, file_type = await loop.run_in_executor(
upload_executor, upload_minio
)
# 构造消息
message = {
"task_id": task_id,
"time": str(datetime.now()),
"detection_id": timestamp,
"minio": {"minio_path": minio_path, "minio_origin_path": minio_origin_path,
"file_type": file_type},
"box_detail": box_detail1,
"uav_location": local_drc_message,
"osd_location": {
"longitude": cam_longitude,
"latitude": cam_latitude
},
"des_location": des_location_result
}
await event_queue.put({
"timestamp": timestamp # 存储事件触发的时刻,用作视频制作
})
message_json = json.dumps(message, ensure_ascii=False)
await mqtt.publish(mqtt_topic, message_json)
asyncio.create_task(upload_and_publish())
# 使用共享变量时加锁,进而进行跳帧,不然上报太频繁
# async with cache_lock:
# # 读取或修改共享变量
# send_count = shared_local_cache["send_count"]
# send_count = send_count + 1
# shared_local_cache["send_count"] = send_count
#
# # 伪代码,目前安全帽比较难识别到
# if 100014 == model_para_list[0]["model_list_func_id"]:
# if send_count > 2:
# # 创建独立任务执行上传和发布
# shared_local_cache["send_count"] = 0
# asyncio.create_task(upload_and_publish())
# else:
# if send_count > 30:
# # 创建独立任务执行上传和发布
# shared_local_cache["send_count"] = 0
# asyncio.create_task(upload_and_publish())
if 'frame' in locals():
upload_time = time.time() - upload_start
print(f"上传耗时: {upload_time:.4f}s")
except Exception as e:
print(f"send_frame_to_s3_mq 错误: {e}")
import traceback
traceback.print_exc()
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("send_frame_to_s3 收到取消信号")
raise
# finally:
# log_perf('send_frame_to_s3_mq', start_time)
# 更新性能统计
stats['processed'] += 1
if time.time() - stats['last_time'] >= 1.0:
stats['avg_fps'] = stats['processed'] / (time.time() - stats['last_time'])
print(f"处理速度: {stats['avg_fps']:.2f} FPS")
stats['processed'] = 0
stats['last_time'] = time.time()
print("send_frame_to_s3线程已停止")
async def cal_des_invade(loop, invade_executor, task_id: str, mqtt, mqtt_publish_topic,
list_points: list[list[any]], camera_para: Camera_Para, model_count: int,
cancel_flag: asyncio.Event = None, invade_queue: asyncio.Queue = None,
event_queue: asyncio.Queue = None,
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1):
# loop = asyncio.get_running_loop()
# upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
print("cal_des_invade")
pic_count_hongxian = 0
track_filter = TrackIDEventFilter(max_inactive_time=8.0)
# 用于记录已上报的track_id及其上报时间
reported_track_ids = defaultdict(float)
# 上报间隔时间(秒)
report_interval = 8
target_location_back = [] # 本地缓存,用作位置重复计算
current_time_second = int(time.time())
while not cancel_flag.is_set():
# 检查队列长度,避免堆积
if invade_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2:
print(f"警告invade_queue 积压(当前长度={invade_queue.qsize()}),清空队列")
while not invade_queue.empty():
await invade_queue.get()
print("队列已清空,等待新数据...")
await asyncio.sleep(0.1)
continue
# 获取队列数据
try:
cv_frame = await asyncio.wait_for(invade_queue.get(), timeout=0.8)
# 检查数据类型
if not isinstance(cv_frame, dict):
print(f"⚠️ 警告cv_frame 不是字典,而是 {type(cv_frame)}")
continue
except asyncio.TimeoutError:
print(f"cal_des_invade TimeoutError")
continue
except asyncio.CancelledError:
print("cal_des_invade 收到取消信号")
raise
if cv_frame is None:
continue
if repeat_time > 0: # 基于时间的重复计算,是否使能
read_cv_time_second = int(time.time())
if read_cv_time_second - current_time_second < repeat_time:
continue
else:
current_time_second = read_cv_time_second
# print("cal_des_invade inside")
frame_copy = cv_frame['frame_copy']
frame = cv_frame['frame']
target_point = cv_frame['target_point']
message_point = cv_frame['message_point']
cls_count = cv_frame['cls_count']
model_list_func_id = cv_frame['model_list_func_id']
model_func_id = cv_frame['model_func_id']
air_alti = cv_frame['osd_info']
detections = cv_frame['detections']
detections_list = cv_frame['detections_list']
model_para_list = cv_frame.get('model_para', {}) # 默认空字典
timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典
if not isinstance(frame, np.ndarray) or frame.size == 0:
print("⚠️ 警告frame不是有效的numpy数组")
continue
# 获取宽高
img_height, img_width = frame.shape[:2]
if not air_alti:
continue
gimbal_yaw = air_alti.gimbal_yaw
gimbal_pitch = air_alti.gimbal_pitch
gimbal_roll = air_alti.gimbal_roll
height = air_alti.height
cam_longitude = air_alti.longitude
cam_latitude = air_alti.latitude
try:
current_time = time.time()
h = device_height
us = []
vs = []
heights = []
should_report = True
for item in target_point:
# # 跳过无效的track_id
# 检查是否应该上报该track_id
u = item["u"]
v = item["v"]
cls_id = item["cls_id"]
track_id = item["track_id"]
new_track_id = item["new_track_id"]
# # 如果这个track_id已经上报过检查是否超过上报间隔
# # if new_track_id in reported_track_ids:
# if track_id in reported_track_ids:
# last_report_time = reported_track_ids[track_id]
# if current_time - last_report_time < report_interval:
# print(f"基于track_id触发去重事件{track_id}")
# should_report = False
print(f"方法 cal_des_invade")
if not track_filter.should_report(track_id):
should_report = False
break
# if track_id < 0: # 适配MultiYOLODetector类该类不支持追踪默认track_id为-1
# should_report = True
# 如果使用TrackIDEventFilter判断需要上报
if should_report:
us.append(u)
vs.append(v)
heights.append(h)
if should_report:
print("进行侵限计算")
location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll,
height, cam_longitude, cam_latitude, img_width, img_height,
heights)
if not location_results:
continue
# for point in location_results:
# target_location.append({point[1], point[2]})
# point_list = [] # 整理红线集合
repeat_state = False
show_des = 0
str_loca = ""
des_location_result = []
if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重
if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息
des1_back = target_location_back[0]
des1_back_longitude = des1_back[0]
des1_back_latitude = des1_back[1]
des1_back_height = des1_back[2]
des1 = location_results[0]
des1_longitude = des1[0]
des1_latitude = des1[1]
des_location_result.append({"longitude": des1_longitude,
"latitude": des1_latitude})
des1_height = des1[2]
str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}"
des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude)
show_des = des
if des < repeat_dis:
print(f"触发基于坐标判断重复,坐标距离:{des}")
repeat_state = True
else:
target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存
else:
target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息
# 测试红线
# for point in results:
# point_list.append([point["u"], point["v"]])
# for point in message_point:
# cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
# (point["box"][2], point["box"][3]), (0, 255, 255), 2)
# cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int32)],
# isClosed=True, color=(0, 0, 255), thickness=2)
pic_count_hongxian = pic_count_hongxian + 1
# cv2.imwrite(f"save_pic\invade-hongxian\hongxianongly-{pic_count_hongxian}.jpg", frame_copy)
# if len(invade_point) > 0:
# print("hongxianhongxianhongxianhongxian")
# pic_count = pic_count + 1
# # cv2.imwrite(f"save_pic\invade\hongxian-{pic_count}.jpg", frame)
# drawn_frame = frame_copy # 关键修复:深拷贝绘制后的帧
# 图像编码
if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑
print(f"未发现重复,现在上传{str_loca}======{show_des}")
def encode_origin_frame():
success, buffer = cv2.imencode(".jpg", frame)
return buffer.tobytes() if success else None
def encode_frame():
success, buffer = cv2.imencode(".jpg", frame_copy)
# if task_id == "2a5d7a80-109a-4cd6-aa95-0e8c9aab6b3f-1": # 测试安全帽
# cv2.imwrite(f"save_pic\qm\hongxian-encode-{pic_count_hongxian}.jpg", frame)
#
# if task_id == "7eecadd6-001f-488c-bed9-1086079c3450-1": # 测试工地车辆
# cv2.imwrite(f"save_pic\gdcl\hongxian-encode-{pic_count_hongxian}.jpg", frame_copy)
return buffer.tobytes() if success else None
buffer_bytes = await loop.run_in_executor(invade_executor, encode_frame)
buffer_origin_bytes = await loop.run_in_executor(invade_executor, encode_origin_frame)
if not buffer_bytes:
continue
# 并行处理上传和MQTT发布
async def upload_and_publish():
# 上传到MinIO
def upload_minio():
minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None)
minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None)
return minio_path, minio_origin_path, file_type
# return upload_frame_buff_from_buffer(buffer_bytes, None)
minio_path, minio_origin_path, file_type = await loop.run_in_executor(
invade_executor, upload_minio
)
print(f"minio_pathminio_pathminio_pathminio_path {minio_path}")
# 构造消息
message = {
"task_id": task_id,
"time": str(datetime.now()),
"detection_id": timestamp,
"minio": {"minio_path": minio_path,
"minio_origin_path": minio_origin_path,
"file_type": file_type},
"box_detail": [{
"model_id": model_func_id,
"cls_count": cls_count,
"box_count": [message_point], # 特殊处理
"location_results": location_results # 增加经纬度信息
}],
"osd_location": {
"longitude": cam_longitude,
"latitude": cam_latitude
},
"des_location": des_location_result
}
if not event_queue.full():
await event_queue.put({
"timestamp": timestamp # 存储事件触发的时刻,用作视频制作
})
else:
logger.warning("event_queue 帧队列已满等待1ms后重试")
await asyncio.sleep(0.001)
print(f"hongxianhongxianhongxianhongxian上传 {message}:{event_queue.qsize()} ")
message_json = json.dumps(message, ensure_ascii=False)
await mqtt.publish(mqtt_publish_topic, message_json)
asyncio.create_task(upload_and_publish())
# 使用共享变量时加锁,进而进行跳帧,不然上报太频繁
# async with invade_cache_lock:
# # 读取或修改共享变量
# send_count = shared_local_cache["invade_send_count"]
# send_count = send_count + 1
# shared_local_cache["invade_send_count"] = send_count
# if send_count > 1:
# # 创建独立任务执行上传和发布
# shared_local_cache["invade_send_count"] = 0
# asyncio.create_task(upload_and_publish())
except Exception as e:
print(f"cal_des_invade 错误: {e}")
import traceback
traceback.print_exc()
await asyncio.sleep(0.1)
print("cal_des_invade读取线程已停止")
def upload_process_main(group_id: str, task_id: str,
mqtt_pub_ip: str, mqtt_pub_port: int,
mqtt_pub_topic: str,
device_height: float, repeat_dis: float,
repeat_time: float,
group_queues: Dict):
"""进程B的主函数 - 优化版本(集成三个异步方法)"""
pid = os.getpid()
logger.info(f"[ProcessB] 启动 (PID:{pid})任务ID: {task_id}")
# 检查CPU状态
check_cpu_affinity()
# 设置低优先级
try:
os.nice(19) # 最低优先级
except:
pass
async def run_upload_tasks():
try:
# 初始化MQTT服务
mqtt = MQTTService(mqtt_pub_ip, port=mqtt_pub_port)
await mqtt.connect()
# 初始化队列和事件
stop_event = group_queues.get('stop_event')
cv_frame_queue = group_queues.get('cv_frame_queue')
invade_queue = group_queues.get('invade_queue')
event_queue = group_queues.get('event_queue')
timestamp_frame_queue = group_queues.get('timestamp_frame_queue')
# 创建线程池执行器
loop = asyncio.get_running_loop()
upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS)
invade_executor = ThreadPoolExecutor(max_workers=Config.INVADE_WORKERS)
event_video_executor = ThreadPoolExecutor(max_workers=Config.EVENT_VIDEO_WORKERS)
# 创建取消事件
cancel_flag = asyncio.Event()
# 创建三个主要任务
tasks = [
# 任务1: 处理侵限事件
# asyncio.create_task(
# cal_des_invade(
# loop=loop,
# invade_executor=invade_executor,
# task_id=task_id,
# mqtt=mqtt,
# mqtt_publish_topic=mqtt_pub_topic,
# list_points=[], # 需要从外部传入
# camera_para=None, # 需要从外部传入
# model_count=1, # 需要根据实际情况设置
# cancel_flag=cancel_flag,
# invade_queue=invade_queue,
# event_queue=event_queue,
# device_height=device_height,
# repeat_dis=repeat_dis,
# repeat_time=repeat_time
# ),
# name=f"{group_id}_cal_des_invade"
# ),
# # 任务2: 发送帧到S3和MQTT
# asyncio.create_task(
# send_frame_to_s3_mq(
# loop=loop,
# upload_executor=upload_executor,
# task_id=task_id,
# mqtt=mqtt,
# mqtt_topic=mqtt_pub_topic,
# cancel_flag=cancel_flag,
# cv_frame_queue=cv_frame_queue,
# event_queue=event_queue,
# device_height=device_height,
# repeat_dis=repeat_dis,
# repeat_time=repeat_time
# ),
# name=f"{group_id}_send_frame_to_s3_mq"
# ),
# 任务3: 切割事件视频并发布
asyncio.create_task(
cut_evnt_video_publish(
# cut_evnt_video_publish_test(
task_id=task_id,
mqtt=mqtt,
mqtt_topic=mqtt_pub_topic,
cancel_flag=cancel_flag,
# event_queue=event_queue,
# timestamp_frame_queue=timestamp_frame_queue
event_queue=event_queue,
timestamp_frame_queue=timestamp_frame_queue
),
name=f"{group_id}_cut_evnt_video_publish"
)
]
# 监控停止事件
while not stop_event.is_set():
await asyncio.sleep(0.1)
# 收到停止信号,取消所有任务
cancel_flag.set()
for task in tasks:
task.cancel()
# 等待任务结束
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"[ProcessB] 异常: {e}")
error_queue = group_queues.get('error_queue')
if error_queue:
error_queue.put({
'group_id': group_id,
'process': 'ProcessB',
'error': str(e),
'time': datetime.now()
})
finally:
# 清理资源
if 'mqtt' in locals():
await mqtt.disconnect()
if 'upload_executor' in locals():
upload_executor.shutdown(wait=True)
if 'invade_executor' in locals():
invade_executor.shutdown(wait=True)
if 'event_video_executor' in locals():
event_video_executor.shutdown(wait=True)
try:
# 运行异步上传任务
asyncio.run(run_upload_tasks())
except Exception as e:
logger.error(f"[ProcessB] 主循环异常: {e}")
error_queue = group_queues.get('error_queue')
if error_queue:
error_queue.put({
'group_id': group_id,
'process': 'ProcessB',
'error': str(e),
'time': datetime.now()
})
def start_rtmp_processing_with_groups(video_url: str, task_id: str,
model_configs: List[Dict],
mqtt_pub_ip: str, mqtt_pub_port: int,
mqtt_pub_topic: str,
mqtt_sub_ip: str, mqtt_sub_port: int,
mqtt_sub_topic: str,
output_rtmp_url: str,
invade_enable: bool, invade_file: str,
camera_para_url: str,
device_height: float, repeat_dis: float,
repeat_time: float,
process_group_manager: ProcessGroupManager = None):
"""启动RTMP处理进程组"""
if process_group_manager is None:
process_group_manager = ProcessGroupManager()
group_id = f"group_{task_id}"
# 创建进程组
result = process_group_manager.create_process_group(
group_id=group_id,
video_url=video_url,
task_id=task_id,
model_configs=model_configs,
mqtt_pub_ip=mqtt_pub_ip,
mqtt_pub_port=mqtt_pub_port,
mqtt_pub_topic=mqtt_pub_topic,
mqtt_sub_ip=mqtt_sub_ip,
mqtt_sub_port=mqtt_sub_port,
mqtt_sub_topic=mqtt_sub_topic,
output_rtmp_url=output_rtmp_url,
invade_enable=invade_enable,
invade_file=invade_file,
camera_para_url=camera_para_url,
device_height=device_height,
repeat_dis=repeat_dis,
repeat_time=repeat_time
)
return process_group_manager, result
if __name__ == '__main__':
# 系统检查
logger.info("=== 高性能AI视频处理系统 ===")
logger.info(f"Python版本: {sys.version}")
logger.info(f"操作系统: {sys.platform}")
# CPU信息
total_cpus = psutil.cpu_count(logical=False)
logical_cpus = psutil.cpu_count(logical=True)
logger.info(f"CPU核心: {total_cpus}物理/{logical_cpus}逻辑")
# 检查隔离支持
try:
with open('/proc/cmdline', 'r') as f:
cmdline = f.read()
if 'isolcpus=' in cmdline:
logger.info("✅ 系统支持CPU隔离")
else:
logger.info("⚠️ 未启用isolcpus使用软件隔离")
except:
pass
multiprocessing.freeze_support()
multiprocessing.set_start_method('spawn', force=True)
process_group_manager = ProcessGroupManager()
# 配置参数
model_configs = [{
'path': r"/mnt/mydisk/ai/pt/best.pt",
'engine_path': r"/mnt/mydisk/ai/engine/renche/renche.engine",
'so_path': r"/mnt/mydisk/ai/engine/renche/libmyplugins.so",
# 'path': r"D:\project\AI-PYTHON\Ai_tottle\pt\best.pt",
# 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine",
# 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll",
# 'cls_map': {'hote(infrared)&fire': '火', 'smoke': '烟'},
# 'allowed_classes': ['smoke', 'hote(infrared)&fire'],
# "cls_index": [0, 1],
# "class_names": ['smoke', 'hote(infrared)&fire'],
# "chinese_label": {0: '烟', 1: '火'},
# "func_id": 100041,
# {0: 'people', 1: 'bicycle',
# 2: 'car', 3: 'van', 4: 'truck',
# 5: 'tricycle', 6: 'awning-tricycle',
# 7: 'bus',8: 'motor'}
# "func_id": 100041,
'cls_map': {'people': '', 'bicycle': '自行车',
'car': '车1', 'van': '车2',
'truck': '车3', 'tricycle': '车4',
'awning-tricycle': '车5', 'bus': '车6',
'motor': '车7', },
'allowed_classes': ['people', 'bicycle', 'car', 'van', 'truck', 'tricycle', 'awning-tricycle', 'bus', 'motor'],
"cls_index": [0, 1, 2, 3, 4, 5, 6, 7, 8],
"class_names": ['people', 'bicycle', 'car', 'van', 'truck', 'tricycle', 'awning-tricycle', 'bus', 'motor'],
"chinese_label": {0: '', 1: '自行车',
2: '车1', 3: '车2',
4: '车3', 5: '车4',
6: '车5', 7: '车6',
8: '车7',},
"func_id": 100001,
"list_func_id": 100000,
"para_invade_enable": False,
"config_conf": 0.4
}]
# video_url = "rtmp://222.212.85.86:1935/live/1234567890"
task_id = "441ba16f-757f-481f-8f73-68a885fcc229-test"
# mqtt_pub_ip = "47.108.62.6"
# mqtt_pub_port = 12503
# mqtt_pub_ip = "8.137.54.85"
# mqtt_pub_ip = "192.168.0.11"
# mqtt_pub_port = 1883
# mqtt_pub_topic = "thing/product/ai/events"
# # mqtt_sub_ip = "47.108.62.6"
# # mqtt_sub_port = 12503
# # mqtt_sub_ip = "8.137.54.85"
# mqtt_sub_ip = "192.168.0.11"
# mqtt_sub_port = 1883
# mqtt_sub_topic = "thing/product/8UUXN6S00A0CK7TEST/drc/up"
# # output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
# output_rtmp_url = "rtmp://192.168.0.11:1935/live/1234567890ai123"
# 九寨沟扎如
# video_url = "rtmp://183.222.73.60:1935/live/1581F8HGX259H00A0WKH"
# mqtt_sub_topic = "thing/product/8UUXNA700A0PWB/drc/up"
# # 则查洼机场
# video_url = "rtmp://183.222.73.60:1935/live/1581F8HGX259H00A0WJC"
# mqtt_sub_topic = "thing/product/8UUXNA700A0PWA/drc/up"
#
# mqtt_pub_ip = "183.222.73.60"
# mqtt_pub_port = 1883
# mqtt_pub_topic = "thing/product/ai/events"
# mqtt_sub_ip = "183.222.73.60"
# mqtt_sub_port = 1883
#
# output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
### # 校友之家
# video_url = "rtmp://222.212.85.86:1935/live/1581F6QAD243C00BMD6W"
# video_url = "rtmp://222.212.85.86:1935/live/123456789012"
video_url = "rtmp://192.168.130.21:1935/live/123456789012"
# video_url = "rtmp://192.168.110.13/live/12345678"
mqtt_sub_topic = "thing/product/8UUXN6S00A0CK7TEST/drc/up"
#
#
mqtt_pub_ip = "47.108.62.6"
mqtt_pub_port = 12503
mqtt_pub_topic = "thing/product/ai/events"
mqtt_sub_ip = "47.108.62.6"
mqtt_sub_port = 12503
# output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123"
output_rtmp_url = "rtmp://192.168.130.21:1935/live/123456789012ai"
invade_enable = True
invade_file = "meta_data/校友之家耕地占用demo.geojson"
camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt"
device_height = 600
repeat_dis = 20
repeat_time = 20
# 启动进程组
process_group_manager, group_id1 = start_rtmp_processing_with_groups(
video_url, task_id, model_configs,
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
output_rtmp_url,
invade_enable, invade_file, camera_para_url,
device_height, repeat_dis, repeat_time,
process_group_manager
)
if group_id1:
logger.info(f"✅ 进程组 {group_id1} 启动成功")
logger.info(f"进程A: 高速推理 (CPU 0-3)")
logger.info(f"进程B: 后台上传 (CPU 4-5)")
else:
logger.error("❌ 进程组启动失败")
sys.exit(1)
try:
# 主监控
monitor_count = 0
while True:
time.sleep(1)
monitor_count += 1
if monitor_count % 10 == 0: # 每10秒打印一次
logger.info(f"\n=== 系统监控 ({datetime.now()}) ===")
# 系统CPU使用率
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
# 只显示前6个CPU核心
cpu_display = [f'{p:.1f}%' for p in cpu_percent[:6]]
logger.info(f"CPU 0-5 使用率: {cpu_display}")
except KeyboardInterrupt:
logger.info("\n=== 收到停止信号,正在停止... ===")
process_group_manager.stop_all_groups(timeout=10)
logger.info("=== 所有进程已停止 ===")
except Exception as e:
logger.error(f"主进程异常: {e}")
process_group_manager.stop_all_groups(timeout=10)