import asyncio import json import logging import multiprocessing import subprocess import time import traceback from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import List, Dict, Any, Optional import queue import os import sys import psutil import signal import re import threading import av import cv2 import numpy as np import torch from multiprocessing import Queue, Process, Event, Manager, Lock from cv_back_video import cal_tricker_results from middleware.MQTTService import MQTTService, MQTTDevice, empty_osd_callback from middleware.entity.air_attitude import Air_Attitude from middleware.entity.camera_para import read_camera_params, Camera_Para from middleware.entity.timestamp_queue import TimestampedQueue from middleware.minio_util import downFile, upload_frame_buff_from_buffer, upload_video_buff_from_buffer from touying.ImageReproject_python.cal_func import red_line_reproject, cal_canv_location_by_osd from touying.ImageReproject_python.img_types import Point from yolo.cv_multi_model_back_video import put_chinese_text, is_point_in_polygonlist, TrackIDEventFilter, \ haversine, frames_to_video_bytes from yolo.detect.multi_yolo_trt_detect_track_trt8 import MultiYoloTrtDetectorTrackId_TRT8 # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - [PID:%(process)d] - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class Config: MAX_WORKERS = 8 FRAME_QUEUE_SIZE = 25 PROCESSED_QUEUE_SIZE = 60 RETRY_COUNT = 3 READ_RTMP_WORKERS = 2 PROCESS_FRAME_WORKERS = 2 UPLOAD_WORKERS = 2 WRITE_FRAME_WORKERS = 2 INVADE_WORKERS = 2 EVENT_VIDEO_WORKERS = 2 MAX_QUEUE_WARN_SIZE = 15 MODEL_INPUT_SIZE = (640, 640) DEFAULT_DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" TARGET_FPS = 25 MAX_FRAME_DROP = 5 PERF_LOG_INTERVAL = 5.0 # CPU隔离配置 - 使用独立的CPU核心 CPU_ALLOCATIONS = { 'process_a': { # RTMP处理进程 - 高优先级,高速推理 'cpus': [0, 1, 2, 3], # 使用4个CPU核心 'numa_node': 0, 'priority': 'high', # 高优先级 'sched_policy': 'fifo' # FIFO调度 }, 'process_b': { # 上传进程 - 低优先级,后台处理 'cpus': [4, 5], # 使用2个CPU核心 'numa_node': 0, 'priority': 'low', # 低优先级 'sched_policy': 'other' } } async def detect_video_resolution(loop, executor, video_url): """探测视频流的分辨率""" try: logger.info(f"开始探测视频分辨率: {video_url}") simple_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=p=0:s=x', video_url ] def run_simple_probe(): try: result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15) if result.returncode == 0 and result.stdout.strip(): dimensions = result.stdout.strip().split('x') if len(dimensions) == 2: width = int(dimensions[0]) height = int(dimensions[1]) if width > 0 and height > 0: return width, height except Exception as e: logger.warning(f"简单分辨率探测失败: {e}") return None dimensions = await loop.run_in_executor(executor, run_simple_probe) if dimensions: width, height = dimensions logger.info(f"探测到视频分辨率: {width}x{height}") return width, height logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080") return 1920, 1080 except Exception as e: logger.error(f"分辨率探测异常: {e}") logger.warning("使用默认分辨率 1920x1080") return 1920, 1080 class CPUIsolator: """CPU隔离管理器 - 高性能版本""" def __init__(self): self.total_cpus = psutil.cpu_count(logical=False) self.logical_cpus = psutil.cpu_count(logical=True) logger.info(f"系统CPU信息:") logger.info(f" - 物理核心数: {self.total_cpus}") logger.info(f" - 逻辑核心数: {self.logical_cpus}") def isolate_process(self, pid: int, config: Dict) -> bool: """高性能CPU隔离""" try: cpu_list = config.get('cpus', []) priority = config.get('priority', 'normal') sched_policy = config.get('sched_policy', 'other') logger.info(f"隔离进程 {pid} 到CPU: {cpu_list}, 优先级: {priority}, 调度策略: {sched_policy}") process = psutil.Process(pid) # 1. 设置CPU亲和性 process.cpu_affinity(cpu_list) # 2. 设置调度策略和优先级 (Linux) if sys.platform == "linux": try: if sched_policy == 'fifo': # SCHED_FIFO: 实时调度,最高优先级 param = os.sched_param(os.sched_get_priority_max(os.SCHED_FIFO)) os.sched_setscheduler(pid, os.SCHED_FIFO, param) elif sched_policy == 'rr': # SCHED_RR: 轮转调度 param = os.sched_param(os.sched_get_priority_max(os.SCHED_RR)) os.sched_setscheduler(pid, os.SCHED_RR, param) elif sched_policy == 'batch': # SCHED_BATCH: 批处理调度 param = os.sched_param(0) os.sched_setscheduler(pid, os.SCHED_BATCH, param) except Exception as e: logger.warning(f"设置调度策略失败: {e}") # 3. 设置nice值 try: if priority == 'high': os.nice(-19) # 最高优先级 elif priority == 'low': os.nice(19) # 最低优先级 else: os.nice(0) # 正常优先级 except: pass # 验证隔离 current_affinity = process.cpu_affinity() logger.info(f"进程 {pid} 隔离完成: CPU亲和性={current_affinity}") return True except Exception as e: logger.error(f"进程隔离失败: {e}") return False def check_cpu_affinity(): """检查当前进程的CPU亲和性""" pid = os.getpid() try: process = psutil.Process(pid) affinity = process.cpu_affinity() # 检查调度策略 sched_policy = "unknown" if sys.platform == "linux": try: policy = os.sched_getscheduler(pid) if policy == os.SCHED_FIFO: sched_policy = "SCHED_FIFO" elif policy == os.SCHED_RR: sched_policy = "SCHED_RR" elif policy == os.SCHED_BATCH: sched_policy = "SCHED_BATCH" elif policy == os.SCHED_OTHER: sched_policy = "SCHED_OTHER" except: pass logger.info(f"进程 {pid} CPU状态: 亲和性={affinity}, 调度策略={sched_policy}") return affinity except Exception as e: logger.warning(f"检查CPU状态失败: {e}") return [] class ProcessGroupManager: """进程组管理器 - 高性能版本""" def __init__(self): self.manager = Manager() self.process_groups = {} self.group_queues = {} self.cpu_isolator = CPUIsolator() def create_process_group(self, group_id: str, video_url: str, task_id: str, model_configs: List[Dict], mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str, output_rtmp_url: str, invade_enable: bool, invade_file: str, camera_para_url: str, device_height: float, repeat_dis: float, repeat_time: float): """创建进程组""" # 创建队列 group_queues = { 'cv_frame_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE), 'event_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE), 'invade_queue': Queue(maxsize=Config.PROCESSED_QUEUE_SIZE), 'timestamp_frame_queue': TimestampedQueue, 'stop_event': Event(), 'error_queue': Queue(), 'performance_counter': Manager().Value('i', 0) # 性能计数器 } # 创建进程A (RTMP处理,高优先级) process_a = Process( target=rtmp_process_main, name=f"{group_id}_ProcessA", args=(group_id, video_url, task_id, model_configs, mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic, mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic, output_rtmp_url, invade_enable, invade_file, camera_para_url, group_queues) ) process_a.start() # CPU隔离 logger.info(f"隔离进程A (PID:{process_a.pid})") self.cpu_isolator.isolate_process(process_a.pid, Config.CPU_ALLOCATIONS['process_a']) time.sleep(1) if not process_a.is_alive(): logger.error("进程A启动失败") return None # 创建进程B (上传处理,低优先级) - 使用正确的函数名 process_b = Process( target=upload_process_main, # 修改为正确的函数名 name=f"{group_id}_ProcessB", args=(group_id, task_id, mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic, device_height, repeat_dis, repeat_time, group_queues) ) process_b.start() # CPU隔离 logger.info(f"隔离进程B (PID:{process_b.pid})") self.cpu_isolator.isolate_process(process_b.pid, Config.CPU_ALLOCATIONS['process_b']) time.sleep(1) if not process_b.is_alive(): logger.error("进程B启动失败") if process_a.is_alive(): process_a.terminate() return None process_list = [process_a, process_b] # 存储信息 self.process_groups[group_id] = { 'process_list': process_list, 'status': 'running', 'created_at': datetime.now(), 'task_id': task_id, 'video_url': video_url } self.group_queues[group_id] = group_queues logger.info(f"进程组 {group_id} 创建完成") # 启动监控 self.start_performance_monitor(group_id) return group_id def start_performance_monitor(self, group_id: str): """启动性能监控线程""" def monitor(): last_time = time.time() while group_id in self.process_groups: try: current_time = time.time() elapsed = current_time - last_time if elapsed >= 5.0: # 每5秒监控一次 # 检查进程状态 group_info = self.process_groups.get(group_id) if not group_info: break process_list = group_info.get('process_list', []) for i, process in enumerate(process_list): if process and process.is_alive(): try: proc = psutil.Process(process.pid) cpu_affinity = proc.cpu_affinity() cpu_percent = proc.cpu_percent(interval=0.5) memory_mb = proc.memory_info().rss / 1024 / 1024 if i == 0: logger.info( f"进程A性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB") else: logger.info( f"进程B性能 - CPU:{cpu_affinity}, 使用率:{cpu_percent:.1f}%, 内存:{memory_mb:.1f}MB") except Exception as e: logger.warning(f"监控进程状态失败: {e}") last_time = current_time time.sleep(1) except Exception as e: logger.warning(f"监控线程异常: {e}") time.sleep(5) monitor_thread = threading.Thread( target=monitor, name=f"{group_id}_monitor", daemon=True ) monitor_thread.start() def stop_process_group(self, group_id: str, timeout: int = 10): """停止进程组""" if group_id not in self.process_groups: return False logger.info(f"停止进程组 {group_id}") group_info = self.process_groups[group_id] group_queues = self.group_queues.get(group_id, {}) # 发送停止信号 stop_event = group_queues.get('stop_event') if stop_event: stop_event.set() # 等待进程结束 process_list = group_info.get('process_list') for process in process_list: if process and process.is_alive(): process.join(timeout=timeout) if process.is_alive(): process.terminate() if group_id in self.group_queues: del self.group_queues[group_id] group_info['status'] = 'stopped' group_info['stopped_at'] = datetime.now() logger.info(f"进程组 {group_id} 已停止") return True def stop_all_groups(self, timeout: int = 10): """停止所有进程组""" logger.info("停止所有进程组") for group_id in list(self.process_groups.keys()): self.stop_process_group(group_id, timeout) logger.info("所有进程组已停止") # # # async def read_rtmp_frames( loop, read_rtmp_frames_executor: ThreadPoolExecutor, video_url: str, device: Optional[MQTTDevice] = None, topic_camera_osd: Optional[str] = None, method_camera_osd: Optional[str] = None, topic_osd_info: Optional[str] = None, method_osd_info: Optional[str] = None, cancel_flag: Optional[asyncio.Event] = None, frame_queue: asyncio.Queue = None, timestamp_frame_queue: TimestampedQueue = None ): """ 异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池) """ max_retries = 20 retry_delay = 2 pic_count = 0 attempt = 0 time_start = time.time_ns() # 添加开始时间统计 frame_count = 0 # 统计总帧数 if cancel_flag is None: cancel_flag = asyncio.Event() # loop = asyncio.get_running_loop() # 打印初始统计信息 print(f"开始读取RTMP流: {video_url}") while not cancel_flag.is_set() and attempt < max_retries: attempt += 1 if cancel_flag.is_set(): logger.info("收到停止信号,终止 RTMP 读取") break container = None try: logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") # 1. 关键优化:将同步的 av.open 和流初始化放到线程池 container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url) video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next, (s for s in container.streams if s.type == 'video')) logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})") # 2. 提前获取一次OSD消息(验证MQTT是否正常) if device and topic_osd_info and method_osd_info: osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) if osd_msg: logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}") else: logger.warning("初始OSD消息为空,可能MQTT尚未收到消息") # 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取 async def async_frame_generator(): """异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环""" def sync_frame_iter(): try: for frame in container.decode(video=0): # 线程内检查取消标志(需定期检查,避免线程无法退出) if cancel_flag.is_set(): logger.info("后台线程检测到取消信号,停止帧迭代") break # 确保是3通道RGB if len(frame.planes) == 1: # 如果是灰度图 gray = frame.to_ndarray(format='gray') # 转换为3通道BGR(不修改尺寸) bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) yield bgr else: # 保持原始尺寸和色彩空间,只转换格式 bgr = frame.to_ndarray(format='bgr24') yield bgr except Exception as e: logger.error(f"同步帧迭代出错: {e}") finally: if container: container.close() logger.info("RTMP容器已关闭") # 将同步迭代器包装为异步生成器 gen = sync_frame_iter() while not cancel_flag.is_set(): try: # 每次获取一帧都通过线程池执行,避免长时间阻塞 frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None) if frame is None: # 迭代结束 break yield frame except StopIteration: logger.info("RTMP流帧迭代结束") break except Exception as e: logger.error(f"异步获取帧出错: {e}") break # 4. 异步迭代帧(不阻塞事件循环) async for frame in async_frame_generator(): if cancel_flag.is_set(): logger.info("检测到取消信号,停止读取帧") break try: # 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作) img = frame.copy() # 确保不修改原始帧 osd_info = None # 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取 if device and topic_osd_info and method_osd_info: osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) if osd_msg and hasattr(osd_msg, 'data'): osd_info = Air_Attitude( gimbal_pitch=osd_msg.data.gimbal_pitch, gimbal_roll=osd_msg.data.gimbal_roll, gimbal_yaw=osd_msg.data.gimbal_yaw, height=osd_msg.data.height, latitude=osd_msg.data.latitude, longitude=osd_msg.data.longitude ) # 7. 异步放入帧队列(避免队列满时阻塞) if not frame_queue.full(): # if True: pic_count += 1 frame_count += 1 # 增加总帧数统计 time_ns = time.time_ns() # 定期输出统计信息(每1000帧) if time_ns - time_start > 1000000000: print(f"readFrames {pic_count}") pic_count = 0 time_start = time_ns # # if img is not None and osd_info is not None: await frame_queue.put((img, osd_info, time_ns)) timestamp_frame_queue.append({ "timestamp": time_ns, "frame": img }) logger.debug( f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}") else: logger.warning("帧队列已满,等待1ms后重试") await asyncio.sleep(0.001) except Exception as frame_error: logger.error(f"处理单帧时出错: {frame_error}", exc_info=True) continue except (av.AVError, IOError) as e: logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") if attempt < max_retries: await asyncio.sleep(retry_delay) else: raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") except asyncio.CancelledError: logger.info("read_rtmp_frames 收到取消信号") raise except Exception as e: logger.error(f"未知错误: {e}", exc_info=True) if attempt < max_retries: await asyncio.sleep(retry_delay) finally: # 双重保险:确保容器关闭 if container and not container.closed: await loop.run_in_executor(None, container.close) logger.info("RTMP容器在finally中关闭") # 最终统计信息 if frame_count > 0: total_time = (time.time_ns() - time_start) / 1e9 avg_fps = frame_count / total_time if total_time > 0 else 0 print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") else: print("RTMP流读取失败,未获取到任何帧") logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}") # # async def read_rtmp_frames( # loop, # read_rtmp_frames_executor: ThreadPoolExecutor, # video_url: str, # device: Optional[MQTTDevice] = None, # topic_camera_osd: Optional[str] = None, # method_camera_osd: Optional[str] = None, # topic_osd_info: Optional[str] = None, # method_osd_info: Optional[str] = None, # cancel_flag: Optional[asyncio.Event] = None, # frame_queue: asyncio.Queue = None, # timestamp_frame_queue: TimestampedQueue = None # ): # """ # 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧) # """ # print("read_rtmp_frames") # max_retries = 20 # retry_delay = 2 # pic_count = 0 # attempt = 0 # time_start = time.time_ns() # frame_count = 0 # # if cancel_flag is None: # cancel_flag = asyncio.Event() # # print(f"开始读取RTMP流: {video_url}") # # while not cancel_flag.is_set() and attempt < max_retries: # attempt += 1 # if cancel_flag.is_set(): # logger.info("收到停止信号,终止 RTMP 读取") # break # # ffmpeg_process = None # width, height = None, None # frame_size = None # # try: # logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") # # # 1. 探测视频分辨率 # width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url) # # if width is None or height is None: # logger.warning("使用默认分辨率 1920x1080") # width, height = 1920, 1080 # # frame_size = width * height * 3 # logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes") # # # 2. 启动 FFmpeg 进程(优化参数提高性能) # ffmpeg_cmd = [ # 'ffmpeg', # '-i', video_url, # '-loglevel', 'warning', # '-fflags', 'nobuffer', # '-flags', 'low_delay', # '-avioflags', 'direct', # '-threads', '1', # 单线程,减少上下文切换 # '-f', 'image2pipe', # '-pix_fmt', 'bgr24', # '-vcodec', 'rawvideo', # '-' # ] # # ffmpeg_process = await loop.run_in_executor( # read_rtmp_frames_executor, # lambda: subprocess.Popen( # ffmpeg_cmd, # stdout=subprocess.PIPE, # stderr=subprocess.PIPE, # bufsize=frame_size # 设置合适的缓冲区大小 # ) # ) # # logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}") # # # 3. 初始化帧读取状态 # frame_sequence = 0 # last_timestamp = time_start # consecutive_corrupted_frames = 0 # 连续损坏帧计数 # max_consecutive_corrupted = 10 # 最大连续损坏帧数 # # while not cancel_flag.is_set(): # try: # # 直接读取完整帧(高性能方式) # raw_frame = await loop.run_in_executor( # read_rtmp_frames_executor, # lambda: ffmpeg_process.stdout.read(frame_size) # ) # # if not raw_frame: # logger.warning("读取到空帧数据,流可能已结束") # break # # current_time_ns = time.time_ns() # frame_sequence += 1 # frame_count += 1 # # # 处理帧数据(无论是否完整) # img = None # is_corrupted = False # # print(f"读取 read_rtmp_frames 判断") # try: # if len(raw_frame) == frame_size: # # 完整帧处理 # frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) # img = frame.copy() # consecutive_corrupted_frames = 0 # 重置连续损坏计数 # else: # # 损坏帧处理 # logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}") # is_corrupted = True # consecutive_corrupted_frames += 1 # # # 创建替代帧 # if consecutive_corrupted_frames <= max_consecutive_corrupted: # # 尝试部分恢复 # img = np.zeros((height, width, 3), dtype=np.uint8) # valid_data = min(len(raw_frame), frame_size) # if valid_data > 0: # # 尽可能填充有效数据 # temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8) # img.flat[:len(temp_frame)] = temp_frame # else: # # 连续损坏过多,创建空白帧 # img = np.zeros((height, width, 3), dtype=np.uint8) # logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧") # # except Exception as frame_error: # logger.error(f"帧数据处理错误: {frame_error}") # # 创建空白帧作为后备 # img = np.zeros((height, width, 3), dtype=np.uint8) # is_corrupted = True # consecutive_corrupted_frames += 1 # # print(f"读取 read_rtmp_frames 判断1") # # 获取OSD信息 # osd_info = None # if device and topic_osd_info and method_osd_info: # try: # osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) # if osd_msg and hasattr(osd_msg, 'data'): # osd_info = Air_Attitude( # gimbal_pitch=osd_msg.data.gimbal_pitch, # gimbal_roll=osd_msg.data.gimbal_roll, # gimbal_yaw=osd_msg.data.gimbal_yaw, # height=osd_msg.data.height, # latitude=osd_msg.data.latitude, # longitude=osd_msg.data.longitude # ) # except Exception as osd_error: # logger.warning(f"获取OSD信息失败: {osd_error}") # # print(f"读取 read_rtmp_frames 判断2") # # 放入帧队列 # # if img is not None and not frame_queue.full(): # if True: # # 确保时间戳递增 # if current_time_ns <= last_timestamp: # current_time_ns = last_timestamp + 1 # last_timestamp = current_time_ns # # # 统计信息 # pic_count += 1 # if current_time_ns - time_start > 1000000000: # 1秒 # elapsed_seconds = (current_time_ns - time_start) / 1e9 # fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0 # corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0 # print( # f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%") # pic_count = 0 # time_start = current_time_ns # # # print(f"读取 read_rtmp_frames 实时流") # # # # 准备帧数据 # # frame_data = { # # "sequence": frame_sequence, # # "frame": img, # # "osd_info": osd_info, # # "timestamp": current_time_ns, # # "is_corrupted": is_corrupted # # } # time_ns = time.time_ns() # # 放入队列 # await frame_queue.put((img, osd_info, time_ns)) # timestamp_frame_queue.append({ # "timestamp": time_ns, # "frame": img # }) # # if frame_sequence % 100 == 0: # 每100帧输出一次日志 # logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}") # # elif frame_queue.full(): # logger.warning("帧队列已满,跳过此帧") # await asyncio.sleep(0.001) # 短暂等待 # # # 检查是否需要重新探测分辨率(仅在连续损坏时) # if consecutive_corrupted_frames > 5: # logger.warning("连续帧损坏,尝试重新探测分辨率") # try: # new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor, # video_url) # if new_width and new_height and (new_width != width or new_height != height): # logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}") # width, height = new_width, new_height # frame_size = width * height * 3 # consecutive_corrupted_frames = 0 # 重置计数 # except Exception as probe_error: # logger.warning(f"重新探测分辨率失败: {probe_error}") # # except Exception as e: # logger.error(f"读取帧数据时出错: {e}", exc_info=True) # # 检查 FFmpeg 进程状态 # if ffmpeg_process and ffmpeg_process.poll() is not None: # try: # stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore') # if stderr_output: # logger.error(f"FFmpeg 进程错误: {stderr_output}") # except: # pass # logger.error("FFmpeg 进程已退出") # break # # 短暂等待后继续 # await asyncio.sleep(0.01) # continue # # except (subprocess.SubprocessError, IOError) as e: # logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") # if attempt < max_retries: # await asyncio.sleep(retry_delay) # else: # raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") # except asyncio.CancelledError: # logger.info("read_rtmp_frames 收到取消信号") # raise # except Exception as e: # logger.error(f"未知错误: {e}", exc_info=True) # if attempt < max_retries: # await asyncio.sleep(retry_delay) # finally: # if ffmpeg_process: # try: # ffmpeg_process.terminate() # try: # await asyncio.wait_for( # loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait), # timeout=2.0 # ) # except asyncio.TimeoutError: # ffmpeg_process.kill() # await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait) # except Exception as e: # logger.warning(f"关闭FFmpeg进程时出错: {e}") # logger.info("FFmpeg 进程已关闭") # # if frame_count > 0: # total_time = (time.time_ns() - time_start) / 1e9 # avg_fps = frame_count / total_time if total_time > 0 else 0 # print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") # else: # print("RTMP流读取失败,未获取到任何帧") # # logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") # # async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT8, cancel_flag: asyncio.Event, # frame_queue: asyncio.Queue, processed_queue: asyncio.Queue, # performance_counter): # """高速处理帧 - 保持原有速度""" # frame_count = 0 # start_time = time.time() # last_log_time = start_time # # # 检查CPU状态 # check_cpu_affinity() # # # 设置高性能模式 # logger.info("[ProcessA] 进入高速推理模式") # # # 预加载图片 # # test_image = cv2.imread(r"/mnt/mydisk1/dj/ai2/middleware/20250711-111516-531_1761271860531810831.jpg") # test_image = cv2.imread(r"E:\yolo-dataset\image\0000006_00611_d_0000002.jpg") # if test_image is None: # logger.error("[ProcessA] 无法加载测试图片") # return # # # 使用高速循环 # batch_size = 4 # batch_frames = [] # batch_timestamps = [] # # while not cancel_flag.is_set(): # try: # # 高速处理,不添加额外延迟 # current_time = time.time_ns() # # # 添加到批次 # batch_frames.append(test_image.copy()) # batch_timestamps.append(current_time) # frame_count += 1 # # # 当批次满时进行推理 # if len(batch_frames) >= batch_size: # try: # # 批量推理 # time_pr_start = time.time_ns() # # # 批量处理每个帧 # batch_results = [] # for frame in batch_frames: # detections, detections_list, model_para = await detector.predict(frame) # batch_results.append({ # 'detections': detections, # 'detections_list': detections_list, # 'model_para': model_para # }) # # time_pr_end = time.time_ns() # avg_inference_time = (time_pr_end - time_pr_start) / len(batch_frames) / 1000000 # # # 批量放入队列 # for i, (frame, result, timestamp) in enumerate(zip(batch_frames, batch_results, batch_timestamps)): # processed_data = { # 'frame': frame, # 'osd_info': None, # 'detections': result['detections'], # 'detections_list': result['detections_list'], # 'timestamp': timestamp, # 'model_para': result['model_para'], # 'predict_state': bool(result['detections']) # } # # if not processed_queue.full(): # await processed_queue.put(processed_data) # # # 性能统计 # current_time_log = time.time() # if current_time_log - last_log_time >= 1.0: # 每秒输出一次 # total_elapsed = current_time_log - start_time # fps = frame_count / total_elapsed # logger.info( # f"[ProcessA] 推理速度: {fps:.1f} FPS, 累计: {frame_count}帧, 平均推理: {avg_inference_time:.2f}ms") # if performance_counter: # performance_counter.value = frame_count # last_log_time = current_time_log # # except Exception as e: # logger.error(f"[ProcessA] 批量推理错误: {e}") # # # 清空批次 # batch_frames.clear() # batch_timestamps.clear() # # # 微小延迟避免CPU 100% # await asyncio.sleep(0.0001) # # except asyncio.CancelledError: # break # except Exception as e: # logger.error(f"[ProcessA] 处理错误: {e}") # await asyncio.sleep(0.01) # # logger.info(f"[ProcessA] 推理结束,总帧数: {frame_count}") async def high_speed_process_frames(detector: MultiYoloTrtDetectorTrackId_TRT8, cancel_flag: asyncio.Event, frame_queue: asyncio.Queue, processed_queue: asyncio.Queue): """协程处理帧队列""" start_time = time.time() time_start = time.time_ns() pic_count = 0 while not cancel_flag.is_set(): frame_start = time.time() try: frame, osd_info, timestamp = await asyncio.wait_for( frame_queue.get(), timeout=0.5 # 延长超时,适配处理耗时 ) try: time_pr_start = time.time_ns() detections, detections_list, model_para = await detector.predict(frame) time_pr_end = time.time_ns() print(f"time_pr_starttime_pr_start {(time_pr_end - time_pr_start) / 1000000}") predict_state = True if detections: print("检测到任何目标") if not detections: predict_state = False logger.debug("未检测到任何目标") # continue # # # # 显示帧用于调试(可选) # cv2.imshow('process_frames', frame) # if cv2.waitKey(1) & 0xFF == ord('q'): # stop_event.set() # break processed_data = { 'frame': frame, 'osd_info': osd_info, 'detections': detections, 'detections_list': detections_list, 'timestamp': timestamp, 'model_para': model_para, 'predict_state': predict_state # predict状态判定,方便rtmp推流做状态判定 } if not processed_queue.full(): time_end = time.time_ns() pic_count = pic_count + 1 if time_end - time_start > 1000000000: print(f"processframes {pic_count}") pic_count = 0 time_start = time_end await processed_queue.put(processed_data) else: logger.warning("处理队列已满,丢弃帧") stats['dropped_frames'] += 1 if 'frame' in locals(): frame_time = time.time() - frame_start # print(f"处理帧耗时: {frame_time:.4f}s") except Exception as e: logger.error(f"处理帧时出错: {e}", exc_info=True) stats['dropped_frames'] += 1 await asyncio.sleep(0.1) except asyncio.TimeoutError: continue except asyncio.CancelledError: print("process_frames 收到取消信号") raise except Exception as e: logger.error(f"获取帧时发生意外错误: {e}", exc_info=True) await asyncio.sleep(0.1) # finally: # log_perf('process_frames', start_time) print("process_frames读取线程已停止") # async def write_results_to_rtmp_high_speed( # task_id: str, # output_url: str = None, # input_fps: float = None, # list_points: List[List[Any]] = None, # camera_para: Camera_Para = None, # invade_state: bool = False, # cancel_flag: asyncio.Event = None, # processed_queue: asyncio.Queue = None, # invade_queue: Queue = None, # cv_frame_queue: asyncio.Queue = None, # cv_process_queue: Optional[Queue] = None, # event_process_queue: Optional[Queue] = None, # stream_containers: Dict[str, Any] = None # ): # """高速推流处理""" # frame_count = 0 # start_time = time.time() # last_log_time = start_time # # logger.info(f"[ProcessA] 推流任务启动 - 高速模式") # # try: # while not cancel_flag.is_set(): # try: # # 高速获取数据 # processed_data = await asyncio.wait_for( # processed_queue.get(), # timeout=0.01 # ) # # frame_count += 1 # # # 放入进程间队列 # if cv_process_queue is not None and not cv_process_queue.full(): # try: # # 简化数据,只放关键信息 # simplified_data = { # 'frame_count': frame_count, # 'timestamp': processed_data.get('timestamp'), # 'predict_state': processed_data.get('predict_state', False) # } # cv_process_queue.put_nowait(simplified_data) # except queue.Full: # pass # # # 性能统计 # current_time = time.time() # if current_time - last_log_time >= 1.0: # 每秒输出一次 # elapsed = current_time - start_time # fps = frame_count / elapsed # logger.info(f"[ProcessA] 推流处理: {frame_count}帧, FPS: {fps:.1f}") # last_log_time = current_time # # except asyncio.TimeoutError: # # 队列为空,继续循环 # await asyncio.sleep(0.001) # continue # except asyncio.CancelledError: # break # except Exception as e: # logger.error(f"[ProcessA] 推流错误: {e}") # # except Exception as e: # logger.error(f"[ProcessA] 推流主循环异常: {e}") # # finally: # logger.info(f"[ProcessA] 推流结束,总帧数: {frame_count}") async def write_results_to_rtmp_high_speed(task_id: str, output_url: str = None, input_fps: float = None, list_points: list[list[any]] = None, camera_para: Camera_Para = None, invade_state: bool = False, cancel_flag: asyncio.Event = None, processed_queue: asyncio.Queue = None, invade_queue: asyncio.Queue = None, cv_frame_queue: asyncio.Queue = None, cv_frame_thread_queue: Queue = None, cv_process_queue: Optional[Queue] = None, stream_containers: Dict[str, Any] = None, ): # global stream_containers, count_pic start_time = time.time() time_start = time.time_ns() pic_count = 0 # 修改推流参数 options = { 'preset': 'veryfast', 'tune': 'zerolatency', 'crf': '23', 'g': '50', # 关键帧间隔 'threads': '2', # 限制编码线程 } codec_name = 'libx264' max_retries = 3 retry_delay = 2.0 # 初始化视频输出 output_video_path = None video_writer = None frame_width, frame_height = None, None fps = input_fps or Config.TARGET_FPS last_frame_time = time.time() - 1 frame_interval = 1.0 / fps try: while not cancel_flag.is_set(): frame_start = time.time() try: # 第一层帧率控制 # current_time = time.time() # # time_diff = frame_interval - (current_time - last_frame_time) # if time_diff > 0: # await asyncio.sleep(time_diff) # last_frame_time = current_time processed_data = await asyncio.wait_for( processed_queue.get(), timeout=1 ) # 确保 processed_data 是字典 if not isinstance(processed_data, dict): print(f"❌ 错误:processed_data 不是字典,而是 {type(processed_data)}") continue frame = processed_data['frame'] # 绘制检测结果 frame_copy = frame.copy() predict_state = processed_data['predict_state'] osd_info = processed_data['osd_info'] img_height, img_width = frame.shape[:2] results = [] results_list = [] # 启用侵限且拿到了飞机的姿态信息,再绘制红线 if invade_state and osd_info: gimbal_yaw = osd_info.gimbal_yaw gimbal_pitch = osd_info.gimbal_pitch gimbal_roll = osd_info.gimbal_roll height = osd_info.height # print(f"heightheightheight {height}") cam_longitude = osd_info.longitude cam_latitude = osd_info.latitude # 当前list_points 虽然是二维数组,但是只存了一个,后续根据业务变化 for points in list_points: # 批量返回图像的像素坐标 point_list = [] results = red_line_reproject(gimbal_yaw, gimbal_pitch, gimbal_roll, height, cam_longitude, cam_latitude, img_width, img_height, points, camera_para) if results: results_list.append(results) # 支持两个区域,高压侵限、营业线侵限 for point in results: point_list.append([point["u"], point["v"]]) cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int64)], isClosed=True, color=(0, 0, 255), thickness=2) # print(f"predict_statepredict_state {predict_state}") # 模型输出了推理结果 if predict_state: # 测试代码,用做测试推理结果,初始化视频写入器(如果尚未初始化) if video_writer is None and output_video_path: frame_width = frame.shape[1] frame_height = frame.shape[0] # 定义视频编码器和输出文件 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'avc1' 等其他编码 # fps = Config.TARGET_FPS # 使用配置中的目标帧率 video_writer = cv2.VideoWriter( output_video_path, fourcc, fps, (frame_width, frame_height) ) print(f"视频写入器已初始化,分辨率: {frame_width}x{frame_height}, FPS: {fps}") detections = processed_data['detections'] detections_list = processed_data['detections_list'] model_para = processed_data['model_para'] class_names = model_para[0]["model_class_names"] chinese_label = model_para[0]["model_chinese_labe"] cls_map = model_para[0]["cls_map"] para_invade_enable = model_para[0]["para_invade_enable"] model_list_func_id = model_para[0]["model_list_func_id"] model_func_id = model_para[0]["func_id"] invade_point = [] message_point = [] target_point = [] # 存储满足条件的图像坐标,方便后续经纬度转换 cls_count = 0 # # 初始化统计字典 class_stats = defaultdict(int) reversed_dict = {value: 0 for value in cls_map.values()} bg_color = (173, 216, 230) # 文本框底色使用淡蓝色 text_color = (0, 255, 0) # 绿色 for det in detections: x1, y1, x2, y2 = map(int, det.bbox) # 确保坐标是整数 cls_id = det.class_id # 假设Detection对象有class_id属性 class_name = det.class_name confidence = det.confidence track_id = det.track_id new_track_id = track_id * 100 + cls_id # 类型小于100或者为负数 # 更新统计 class_stats[cls_id] += 1 # 如果开起侵限功能,就只显示侵限内的框 point_x = (x1 + x2) / 2 point_y = (y1 + y2) / 2 # print(f"class_name--{class_name}") # print(f"model_class_names: {model_para[0]['model_class_names']}") if class_name not in model_para[0]["model_class_names"]: continue en_name = model_para[0]["model_chinese_labe"][ model_para[0]["model_class_names"].index(class_name)] if invade_state: # 同时适配多个区域的侵限判断 is_invade = is_point_in_polygonlist(point_x, point_y, results_list) # is_invade = is_point_in_polygon(point_x, point_y, results) # print(f"is_invadeis_invadeis_invade {is_invade} {len(results)}") if is_invade: cls_count += 1 invade_point.append({ "u": point_x, "v": point_y, "class_name": class_name }) target_point.append({ "u": point_x, "v": point_y, "cls_id": cls_id, "track_id": track_id, "new_track_id": new_track_id }) # 对于侵限,只存储侵限目标 # model_list_func_id = model_para[0]["model_list_func_id"] # model_func_id = model_para[0]["func_id"] message_point.append({ "confidence": float(confidence), "cls_id": cls_id, "type_name": en_name, "track_id": track_id, "box": [x1, y1, x2, y2] }) label = f"{en_name}:{confidence:.2f}:{track_id}" # 计算文本位置 text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[ 0] text_width, text_height = text_size[0], text_size[1] text_x = x1 text_y = y1 - 5 # 如果文本超出图像顶部,则放在框内部下方 if text_y < 0: text_y = y2 + text_height + 5 temp_img = frame_copy.copy() frame_copy = put_chinese_text( temp_img, # label, # 置信度、类别、用作测试 "", # 注释掉汉字 (text_x, text_y - text_height), ) else: cls_count += 1 # 绘制边界框 cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (0, 255, 255), 2) message_point.append({ "confidence": float(confidence), "cls_id": cls_id, "type_name": en_name, "track_id": track_id, "box": [x1, y1, x2, y2] }) target_point.append({ "u": point_x, "v": point_y, "cls_id": cls_id, "track_id": track_id, "new_track_id": new_track_id }) # 对于侵限,只存储侵限目标 # 准备标签文本 # label = f"{chinese_label.get(cls_id, class_name)}: {confidence:.2f}:{track_id}" label = f"{confidence:.2f}:{track_id}" # 计算文本位置 text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=8)[0] text_width, text_height = text_size[0], text_size[1] text_x = x1 text_y = y1 - 5 # 如果文本超出图像顶部,则放在框内部下方 if text_y < 0: text_y = y2 + text_height + 5 # 绘制文本背景 padding = 2 temp_img = frame_copy.copy() frame_copy = put_chinese_text( temp_img, # label, # 置信度、类别、用作测试 "", # 注释掉汉字 (text_x, text_y - text_height), ) if invade_state: for point in message_point: cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]), (point["box"][2], point["box"][3]), (0, 255, 255), 2) # 画红线 # 在左上角显示统计结果 stats_text = [] for cls_id, count in class_stats.items(): cls_name = chinese_label.get(cls_id, class_names[cls_id] if class_names and cls_id < len( class_names) else str( cls_id)) reversed_dict[cls_name] = count for key, value in reversed_dict.items(): stats_text.append(f"{key}: {value}") if stats_text: # 计算统计文本的总高度 text_height = cv2.getTextSize("Test", cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, thickness=1)[0][ 1] total_height = len(stats_text) * (text_height + 18) # 5是行间距 # 统计文本的起始位置(左上角) start_x = 50 # 留出200像素宽度 start_y = 20 # 从顶部开始 # # 绘制统计背景 # # bg_color = (0, 0, 0) # frame_copy = cv2.rectangle( # frame_copy, # (start_x - 10, start_y - 10), # (400, start_y + total_height + 20), # bg_color, # -1 # ) # # 逐行绘制统计文本 # for i, text in enumerate(stats_text): # y_pos = start_y + i * (text_height + 30) # temp_img = frame_copy.copy() # frame_copy = put_chinese_text( # temp_img, # text, # (start_x, y_pos), # color=text_color # ) new_data = { 'frame_copy': frame_copy, 'frame': frame, "osd_info": osd_info, 'detections': detections, "message_point": message_point, "cls_count": cls_count, "target_point": target_point, "model_list_func_id": model_list_func_id, "model_func_id": model_func_id, # 提取第一个模型的func_id 字段,因为现在已经要做整合,无法在区分各个提取第一个模型的func_id 'timestamp': processed_data.get('timestamp'), "detections_list": detections_list, "model_para": model_para # 'model_para': processed_data.get('model_para', {}) # 确保 model_para 存在 } # 临时代码 rtmp 和侵限逻辑要改 if invade_state: # para_list 中使能了 para_invade_enable,才做侵限判断 if para_invade_enable: if not invade_queue.full(): await invade_queue.put(new_data) else: if not cv_frame_queue.full(): await cv_frame_queue.put(new_data) if not cv_frame_thread_queue.full(): cv_frame_thread_queue.put(new_data) time_end = time.time_ns() pic_count = pic_count + 1 if time_end - time_start > 1000000000: print(f"writeFrames {pic_count}") pic_count = 0 time_start = time_end # # count_p = count_p + 1 # cv2.imwrite(f"save_pic/rtmp/test-{count_p}.jpg", frame_copy) # video_writer.write(frame_copy) # else: # frame_copy = frame # 转换颜色空间 rgb_frame = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) # # # # # 显示帧用于调试(可选) # cv2.imshow(f"write_results_to_rtmp-{task_id}", frame_copy) # if cv2.waitKey(1) & 0xFF == ord('q'): # stop_event.set() # break # 初始化推流容器(如果尚未初始化) if output_url and output_url not in stream_containers: try: # container = av.open(output_url, mode='w', format='flv') container = av.open( output_url, mode='w', format='flv', options={ # 添加RTMP连接超时(5秒)和数据超时(10秒) 'rtmp_connect_timeout': '5000000', # 单位:微秒(5秒) 'rtmp_timeout': '10000000', # 单位:微秒(10秒) 'stimeout': '5000000' # 底层socket超时 } ) stream = container.add_stream(codec_name, rate=Config.TARGET_FPS) # stream.time_base = f"1/{Config.TARGET_FPS}" stream.width = frame.shape[1] stream.height = frame.shape[0] stream.pix_fmt = 'yuv420p' stream.options = options stream_containers[output_url] = { 'container': container, 'stream': stream, 'last_frame_time': time.time(), 'frame_count': 0, 'retry_count': 0 } print(f"✅ 推流初始化成功: {output_url}") except Exception as e: print(f"❌ 推流初始化失败: {e}") if 'container' in locals(): try: container.close() except: pass await asyncio.sleep(1.0) continue # 推流逻辑 if output_url and output_url in stream_containers: try: container_info = stream_containers[output_url] stream = container_info['stream'] container = container_info['container'] if rgb_frame.dtype == np.uint8: av_frame = av.VideoFrame.from_ndarray(rgb_frame, format='rgb24') packets = stream.encode(av_frame) # 这是异步方法 # print(f"📦 encode 生成 {len(packets)} 个 packet") if packets: for packet in packets: try: container.mux(packet) container_info['last_frame_time'] = time.time() container_info['frame_count'] += 1 except Exception as e: logger.warning(f"推流数据包错误: {e}") container_info['retry_count'] += 1 if container_info['retry_count'] > max_retries: raise else: # 编码器仍在初始化,不更新 last_frame_time pass # 每100帧打印一次状态 # if container_info['frame_count'] % 100 == 0: # print(f"ℹ️ 已推送 {container_info['frame_count']} 帧到 {output_url}") if 'frame' in locals(): frame_time = time.time() - frame_start print(f"推流帧耗时: {frame_time:.4f}s") else: print(f"⚠️ 无效帧格式: {rgb_frame.dtype}") except Exception as e: logger.error(f"❌ 推流错误: {e}") # 尝试重新初始化推流 if output_url in stream_containers: try: stream_containers[output_url]['container'].close() except: pass del stream_containers[output_url] await asyncio.sleep(retry_delay) continue except asyncio.TimeoutError: # if stop_event.is_set(): # break continue except asyncio.CancelledError: print("write_results_to_rtmp 收到取消信号") raise except Exception as e: logger.error(f"推流处理异常: {e}", exc_info=True) await asyncio.sleep(0.1) finally: # log_perf('write_results_to_rtmp', start_time) # 清理推流容器 for url, info in list(stream_containers.items()): try: if 'container' in info: info['container'].close() except Exception as e: logger.warning(f"关闭推流容器时出错: {e}") stream_containers.clear() # 清理视频写入器 if video_writer is not None: video_writer.release() print("write_results_to_rtmp读取线程已停止") async def rtmp_process_async_high_speed(group_id: str, video_url: str, task_id: str, model_configs: List[Dict], mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str, output_rtmp_url: str, invade_enable: bool, invade_file: str, camera_para_url: str, cv_frame_queue: Queue, event_queue: Queue, invade_queue: Queue, stop_event: Event, performance_counter): """进程A的异步主逻辑 - 高速版本""" logger.info(f"[ProcessA] 启动高速处理引擎") logger.info(f"[ProcessA] 目标FPS: 20+") # 初始化异步队列 cancel_flag = asyncio.Event() frame_queue = asyncio.Queue(maxsize=Config.FRAME_QUEUE_SIZE) processed_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE * 2) # 增大队列 cv_async_frame_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE) event_async_queue = asyncio.Queue(maxsize=Config.PROCESSED_QUEUE_SIZE) stream_containers: Dict[str, Any] = {} timestamp_frame_queue = TimestampedQueue(maxlen=1000) loop = asyncio.get_running_loop() try: # 加载侵限区域数据 list_points = [] camera_para = None if invade_file and camera_para_url: try: camera_file_path = downFile(camera_para_url) camera_para = read_camera_params(camera_file_path) except Exception as e: logger.warning(f"[ProcessA] 加载相机参数失败: {e}") # 初始化模型 detector = MultiYoloTrtDetectorTrackId_TRT8(model_configs) logger.info(f"[ProcessA] 模型加载完成") # 创建任务 tasks = [] process_task = None write_task = None # 初始化MQTT设备(如果需要) device = None if mqtt_sub_ip and mqtt_sub_port and mqtt_sub_topic: try: device = MQTTDevice(host=mqtt_sub_ip, port=mqtt_sub_port, client_id=f"{task_id}_sub") device.subscribe(topic=mqtt_sub_topic, callback=empty_osd_callback) logger.info(f"[ProcessA] MQTT订阅已初始化: {mqtt_sub_topic}") except Exception as e: logger.warning(f"[ProcessA] MQTT初始化失败: {e}") try: # 创建线程池 read_rtmp_executor = ThreadPoolExecutor(max_workers=Config.READ_RTMP_WORKERS, thread_name_prefix=f"{group_id}_read_rtmp") # 任务1: 拉流 read_task = asyncio.create_task( read_rtmp_frames( loop=loop, read_rtmp_frames_executor=read_rtmp_executor, video_url=video_url, device=device, topic_camera_osd=mqtt_sub_topic, method_camera_osd="drc_camera_osd_info_push", topic_osd_info=mqtt_sub_topic, method_osd_info="osd_info_push", cancel_flag=cancel_flag, frame_queue=frame_queue, timestamp_frame_queue=timestamp_frame_queue ), name=f"{group_id}_read_rtmp" ) tasks.append(read_task) # 高速处理任务 process_frame_executor = ThreadPoolExecutor(max_workers=Config.PROCESS_FRAME_WORKERS) process_task = asyncio.create_task( high_speed_process_frames(detector, cancel_flag, frame_queue, processed_queue), name=f"{group_id}_process_frames" ) tasks.append(process_task) # 高速推流任务 invade_state = bool(list_points) and invade_enable write_frame_executor = ThreadPoolExecutor(max_workers=Config.WRITE_FRAME_WORKERS) write_task = asyncio.create_task( write_results_to_rtmp_high_speed( task_id, output_rtmp_url, None, list_points, camera_para, invade_state, cancel_flag, processed_queue, invade_queue, cv_async_frame_queue, cv_frame_queue, event_queue, stream_containers ), name=f"{group_id}_write_rtmp" ) tasks.append(write_task) # 等待停止事件 while not (stop_event.is_set() or cancel_flag.is_set()): await asyncio.sleep(1) except asyncio.CancelledError: logger.info(f"[ProcessA] 任务被取消") except Exception as e: logger.error(f"[ProcessA] 任务异常: {e}") raise finally: # 停止所有任务 cancel_flag.set() for task in tasks: if task and not task.done(): task.cancel() # 等待任务结束 if tasks: await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: logger.error(f"[ProcessA] 异常: {e}") raise finally: # 清理资源 if 'detector' in locals(): detector.destroy() logger.info(f"[ProcessA] 高速处理结束") def rtmp_process_main(group_id: str, video_url: str, task_id: str, model_configs: List[Dict], mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str, output_rtmp_url: str, invade_enable: bool, invade_file: str, camera_para_url: str, group_queues: Dict): """进程A的主函数 - 高速版本""" pid = os.getpid() logger.info(f"[ProcessA] 启动 (PID:{pid}),任务ID: {task_id}") # 检查CPU状态 check_cpu_affinity() try: asyncio.run(rtmp_process_async_high_speed( group_id, video_url, task_id, model_configs, mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic, mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic, output_rtmp_url, invade_enable, invade_file, camera_para_url, group_queues.get('cv_frame_queue'), group_queues.get('event_queue'), group_queues.get('invade_queue'), group_queues.get('stop_event'), group_queues.get('performance_counter') )) except Exception as e: logger.error(f"[ProcessA] 异常: {e}") error_queue = group_queues.get('error_queue') if error_queue: error_queue.put({ 'group_id': group_id, 'process': 'ProcessA', 'error': str(e), 'time': datetime.now() }) async def cut_evnt_video_publish(task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event, event_queue: asyncio.Queue = None, timestamp_frame_queue: TimestampedQueue = None): loop = asyncio.get_running_loop() upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS) while not cancel_flag.is_set(): try: event = await event_queue.get() print(f"[cut循环] 成功获取事件: {event}") if not isinstance(event, dict) or "timestamp" not in event: print(f"[cut循环] 无效事件,跳过") continue timestamp = event.get("timestamp") if timestamp is None: print("⚠️ 警告:事件中缺少 timestamp") continue # 基于frame_len 控制生成视频长度 matched_items = timestamp_frame_queue.query_by_timestamp(timestamp, 150) print(f"[cut循环] 匹配到 {len(matched_items)} 个帧") if len(matched_items) == 0: continue frames = [item["frame"] for item in matched_items] if not frames: # 显式检查帧列表是否为空 print("⚠️ 警告:匹配到的帧列表为空") continue video_bytes = frames_to_video_bytes(frames, fps=25, format="mp4") if video_bytes is None: # 检查返回值 print(f"⚠ ️警告:视频生成失败,返回None {len(frames)}") continue async def upload_and_publish(): def upload_minio(): return upload_video_buff_from_buffer(video_bytes, video_format="mp4") try: minio_path, file_type = await loop.run_in_executor(upload_executor, upload_minio) # message = {"detection_id": timestamp, "minio_path": minio_path} message = { "task_id": task_id, "time": str(datetime.now()), "detection_id": timestamp, "minio": {"minio_path": minio_path, "minio_origin_path": minio_path, "file_type": file_type}, "box_detail": [{ }], "osd_location": { }, "des_location": [] } print(f"成功上传视频: {message}") await mqtt.publish(mqtt_topic, json.dumps(message, ensure_ascii=False)) except Exception as e: print(f"上传失败: {e}") asyncio.create_task(upload_and_publish()) except Exception as e: print(f"[cut循环] 处理事件出错: {e}") finally: if event is not None: event_queue.task_done() print(f"[cut循环] 确认消费,剩余未完成任务: {event_queue._unfinished_tasks}") if event_queue.qsize() > 0 and event_queue._unfinished_tasks == 0: try: event_queue.put_nowait(None) dummy = await event_queue.get() event_queue.task_done() except: pass def get_local_drc_message(): global local_drc_message if local_drc_message is not None: mess = local_drc_message local_drc_message = None return mess return None async def send_frame_to_s3_mq(loop, upload_executor, task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event, cv_frame_queue: asyncio.Queue, event_queue: asyncio.Queue = None, device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1): global stats start_time = time.time() # executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) # upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS) track_filter = TrackIDEventFilter(max_inactive_time=10) reported_track_ids = defaultdict(float) count_pic = 0 report_interval = 8 # loop = asyncio.get_running_loop() local_func_cache = { "func_100000": None, "func_100004": None, # 存储缓存,缓存人员track_id "func_100006": None # 存储缓存,缓存车辆track_id } para = { "category": 3 } target_location_back = [] # 本地缓存,用作位置重复计算 current_time_second = int(time.time()) repeat_time_count = 0 while not cancel_flag.is_set(): upload_start = time.time() try: # 检查队列长度,避免堆积 if cv_frame_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2: print(f"警告:cv_frame_queue 积压(当前长度={cv_frame_queue.qsize()}),清空队列") while not cv_frame_queue.empty(): await cv_frame_queue.get() print("队列已清空,等待新数据...") await asyncio.sleep(0.1) continue # 获取队列数据 try: cv_frame = await asyncio.wait_for(cv_frame_queue.get(), timeout=0.05) # 检查数据类型 if not isinstance(cv_frame, dict): print(f"⚠️ 警告:cv_frame 不是字典,而是 {type(cv_frame)}") continue except asyncio.TimeoutError: continue if repeat_time > 0: # 基于时间的重复计算,是否使能 read_cv_time_second = int(time.time()) # print(f"read_cv_time_secondread_cv_time_second {read_cv_time_second}") if read_cv_time_second - current_time_second < repeat_time and repeat_time_count > 0: # print("触发事件去重") continue else: print("没有触发事件去重") repeat_time_count += 1 # 防止丢失第一个目标 current_time_second = read_cv_time_second # 准备数据 frame_copy = cv_frame['frame_copy'] frame = cv_frame['frame'] target_point = cv_frame['target_point'] detections = cv_frame['detections'] detections_list = cv_frame['detections_list'] model_para_list = cv_frame.get('model_para', {}) # 默认空字典 timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典 air_alti = cv_frame['osd_info'] img_height, img_width = frame.shape[:2] gimbal_yaw = air_alti.gimbal_yaw gimbal_pitch = air_alti.gimbal_pitch gimbal_roll = air_alti.gimbal_roll height = air_alti.height cam_longitude = air_alti.longitude cam_latitude = air_alti.latitude # 初始化默认值 frame11 = frame_copy # 默认使用原始帧 box_detail = [] # 默认空列表 current_time = time.time() h = device_height us = [] vs = [] heights = [] should_report = True print(f"target_pointtarget_point {len(target_point)}") count_item = 0 des_location_result = [] for item in target_point: # # 跳过无效的track_id # 检查是否应该上报该track_id u = item["u"] v = item["v"] cls_id = item["cls_id"] track_id = item["track_id"] new_track_id = item["new_track_id"] # should_report = True # # 如果这个track_id已经上报过,检查是否超过上报间隔 # if new_track_id in reported_track_ids: # last_report_time = reported_track_ids[new_track_id] # if current_time - last_report_time < report_interval: # print(f"基于track_id,触发去重事件:{new_track_id}") # should_report = False # print(f"new_track_idnew_track_id {new_track_id} {track_id}") # if should_report and track_filter.should_report(new_track_id): # should_report = True print(f"方法 send_frame——to {count_item}") count_item += 1 if not track_filter.should_report(track_id): should_report = False break # if track_id < 0: # 适配MultiYOLODetector类,该类不支持追踪,默认track_id为-1 # should_report = True # 如果使用TrackIDEventFilter判断需要上报 if should_report: us.append(u) vs.append(v) heights.append(h) # location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll, # height, cam_longitude, cam_latitude, img_width, img_height, # heights) # # if not location_results: # print("location_results is null") # continue if len(us) > 0: print("进行侵限计算") location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll, height, cam_longitude, cam_latitude, img_width, img_height, heights) if not location_results: continue if location_results: des1 = location_results[0] des1_longitude = des1[0] des1_latitude = des1[1] des1_height = des1[2] des_location_result.append({"longitude": des1_longitude, "latitude": des1_latitude}) repeat_state = False show_des = 0 str_loca = "" if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重 if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息 des1_back = target_location_back[0] des1_back_longitude = des1_back[0] des1_back_latitude = des1_back[1] des1_back_height = des1_back[2] des1 = location_results[0] des1_longitude = des1[0] des1_latitude = des1[1] des1_height = des1[2] des_latitude = [] str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}" des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude) show_des = des if des < repeat_dis: print(f"触发基于坐标判断重复,坐标距离:{des}") repeat_state = True else: target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存 else: target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息 if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑 for idx, model_para in enumerate(model_para_list): if not isinstance(detections_list, list) or idx >= len(detections_list): continue detections = detections_list[idx] # 正确获取当前模型的检测结果 chinese_label = model_para.get("model_chinese_labe", {}) model_cls = model_para.get("model_cls_index", {}) list_func_id = model_para.get("model_list_func_id", -11) func_id = model_para.get("func_id", []) # 获取DRC消息(同步操作,放到线程池) local_drc_message = await loop.run_in_executor(upload_executor, get_local_drc_message) if detections is None or len(detections.boxes) < 1: continue try: # 图像处理和结果计算 frame11, box_detail1 = await loop.run_in_executor( upload_executor, cal_tricker_results, frame_copy, detections, None, func_id, local_func_cache, para, model_cls, chinese_label, list_func_id ) print(f"cal_tricker_results") print(f"func_id {func_id}") print(f"local_func_cache {local_func_cache}") print(f"para {para}") print(f"model_cls {model_cls}") print(f"chinese_label {chinese_label}") print(f"list_func_id {list_func_id}") print(f"box_detail1 {len(box_detail1)}") except Exception as e: print(f"处理帧时出错: {str(e)}") import traceback traceback.print_exc() continue count_pic = count_pic + 1 # 图像编码 def encode_frame(): success, buffer = cv2.imencode(".jpg", frame11) return buffer.tobytes() if success else None def encode_origin_frame(): success, buffer = cv2.imencode(".jpg", frame) return buffer.tobytes() if success else None buffer_bytes = await loop.run_in_executor(upload_executor, encode_frame) buffer_origin_bytes = await loop.run_in_executor(upload_executor, encode_origin_frame) if not buffer_bytes: continue # 并行处理上传和MQTT发布 async def upload_and_publish(): # 上传到MinIO def upload_minio(): minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None) minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None) return minio_path, minio_origin_path, file_type minio_path, minio_origin_path, file_type = await loop.run_in_executor( upload_executor, upload_minio ) # 构造消息 message = { "task_id": task_id, "time": str(datetime.now()), "detection_id": timestamp, "minio": {"minio_path": minio_path, "minio_origin_path": minio_origin_path, "file_type": file_type}, "box_detail": box_detail1, "uav_location": local_drc_message, "osd_location": { "longitude": cam_longitude, "latitude": cam_latitude }, "des_location": des_location_result } await event_queue.put({ "timestamp": timestamp # 存储事件触发的时刻,用作视频制作 }) message_json = json.dumps(message, ensure_ascii=False) await mqtt.publish(mqtt_topic, message_json) asyncio.create_task(upload_and_publish()) # 使用共享变量时加锁,进而进行跳帧,不然上报太频繁 # async with cache_lock: # # 读取或修改共享变量 # send_count = shared_local_cache["send_count"] # send_count = send_count + 1 # shared_local_cache["send_count"] = send_count # # # 伪代码,目前安全帽比较难识别到 # if 100014 == model_para_list[0]["model_list_func_id"]: # if send_count > 2: # # 创建独立任务执行上传和发布 # shared_local_cache["send_count"] = 0 # asyncio.create_task(upload_and_publish()) # else: # if send_count > 30: # # 创建独立任务执行上传和发布 # shared_local_cache["send_count"] = 0 # asyncio.create_task(upload_and_publish()) if 'frame' in locals(): upload_time = time.time() - upload_start print(f"上传耗时: {upload_time:.4f}s") except Exception as e: print(f"send_frame_to_s3_mq 错误: {e}") import traceback traceback.print_exc() await asyncio.sleep(0.1) except asyncio.CancelledError: print("send_frame_to_s3 收到取消信号") raise # finally: # log_perf('send_frame_to_s3_mq', start_time) # 更新性能统计 stats['processed'] += 1 if time.time() - stats['last_time'] >= 1.0: stats['avg_fps'] = stats['processed'] / (time.time() - stats['last_time']) print(f"处理速度: {stats['avg_fps']:.2f} FPS") stats['processed'] = 0 stats['last_time'] = time.time() print("send_frame_to_s3线程已停止") async def cal_des_invade(loop, invade_executor, task_id: str, mqtt, mqtt_publish_topic, list_points: list[list[any]], camera_para: Camera_Para, model_count: int, cancel_flag: asyncio.Event = None, invade_queue: asyncio.Queue = None, event_queue: asyncio.Queue = None, device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1): # loop = asyncio.get_running_loop() # upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS) pic_count_hongxian = 0 track_filter = TrackIDEventFilter(max_inactive_time=8.0) # 用于记录已上报的track_id及其上报时间 reported_track_ids = defaultdict(float) # 上报间隔时间(秒) report_interval = 8 target_location_back = [] # 本地缓存,用作位置重复计算 current_time_second = int(time.time()) while not cancel_flag.is_set(): # 检查队列长度,避免堆积 if invade_queue.qsize() > Config.PROCESSED_QUEUE_SIZE // 2: print(f"警告:invade_queue 积压(当前长度={invade_queue.qsize()}),清空队列") while not invade_queue.empty(): await invade_queue.get() print("队列已清空,等待新数据...") await asyncio.sleep(0.1) continue # 获取队列数据 try: cv_frame = await asyncio.wait_for(invade_queue.get(), timeout=0.8) # 检查数据类型 if not isinstance(cv_frame, dict): print(f"⚠️ 警告:cv_frame 不是字典,而是 {type(cv_frame)}") continue except asyncio.TimeoutError: print(f"cal_des_invade TimeoutError") continue except asyncio.CancelledError: print("cal_des_invade 收到取消信号") raise if cv_frame is None: continue if repeat_time > 0: # 基于时间的重复计算,是否使能 read_cv_time_second = int(time.time()) if read_cv_time_second - current_time_second < repeat_time: continue else: current_time_second = read_cv_time_second # print("cal_des_invade inside") frame_copy = cv_frame['frame_copy'] frame = cv_frame['frame'] target_point = cv_frame['target_point'] message_point = cv_frame['message_point'] cls_count = cv_frame['cls_count'] model_list_func_id = cv_frame['model_list_func_id'] model_func_id = cv_frame['model_func_id'] air_alti = cv_frame['osd_info'] detections = cv_frame['detections'] detections_list = cv_frame['detections_list'] model_para_list = cv_frame.get('model_para', {}) # 默认空字典 timestamp = cv_frame.get('timestamp', time.time_ns()) # 默认空字典 if not isinstance(frame, np.ndarray) or frame.size == 0: print("⚠️ 警告:frame不是有效的numpy数组") continue # 获取宽高 img_height, img_width = frame.shape[:2] if not air_alti: continue gimbal_yaw = air_alti.gimbal_yaw gimbal_pitch = air_alti.gimbal_pitch gimbal_roll = air_alti.gimbal_roll height = air_alti.height cam_longitude = air_alti.longitude cam_latitude = air_alti.latitude try: current_time = time.time() h = device_height us = [] vs = [] heights = [] should_report = True for item in target_point: # # 跳过无效的track_id # 检查是否应该上报该track_id u = item["u"] v = item["v"] cls_id = item["cls_id"] track_id = item["track_id"] new_track_id = item["new_track_id"] # # 如果这个track_id已经上报过,检查是否超过上报间隔 # # if new_track_id in reported_track_ids: # if track_id in reported_track_ids: # last_report_time = reported_track_ids[track_id] # if current_time - last_report_time < report_interval: # print(f"基于track_id,触发去重事件:{track_id}") # should_report = False print(f"方法 cal_des_invade") if not track_filter.should_report(track_id): should_report = False break # if track_id < 0: # 适配MultiYOLODetector类,该类不支持追踪,默认track_id为-1 # should_report = True # 如果使用TrackIDEventFilter判断需要上报 if should_report: us.append(u) vs.append(v) heights.append(h) if should_report: print("进行侵限计算") location_results = cal_canv_location_by_osd(us, vs, gimbal_pitch, gimbal_yaw, gimbal_roll, height, cam_longitude, cam_latitude, img_width, img_height, heights) if not location_results: continue # for point in location_results: # target_location.append({point[1], point[2]}) # point_list = [] # 整理红线集合 repeat_state = False show_des = 0 str_loca = "" des_location_result = [] if repeat_dis > 0: # ai_model_list repeat_dis 字段大于零,才启用去重 if len(target_location_back) > 0: # 当前逻辑并不严谨,只是比较了第一个位置信息 des1_back = target_location_back[0] des1_back_longitude = des1_back[0] des1_back_latitude = des1_back[1] des1_back_height = des1_back[2] des1 = location_results[0] des1_longitude = des1[0] des1_latitude = des1[1] des_location_result.append({"longitude": des1_longitude, "latitude": des1_latitude}) des1_height = des1[2] str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}" des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude) show_des = des if des < repeat_dis: print(f"触发基于坐标判断重复,坐标距离:{des}") repeat_state = True else: target_location_back = location_results # 未触发去重逻辑,即更新本地位置缓存 else: target_location_back = location_results # 基于坐标位置去重判断,如果失败就缓存本次位置信息 # 测试红线 # for point in results: # point_list.append([point["u"], point["v"]]) # for point in message_point: # cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]), # (point["box"][2], point["box"][3]), (0, 255, 255), 2) # cv2.polylines(frame_copy, [np.array(point_list, dtype=np.int32)], # isClosed=True, color=(0, 0, 255), thickness=2) pic_count_hongxian = pic_count_hongxian + 1 # cv2.imwrite(f"save_pic\invade-hongxian\hongxianongly-{pic_count_hongxian}.jpg", frame_copy) # if len(invade_point) > 0: # print("hongxianhongxianhongxianhongxian") # pic_count = pic_count + 1 # # cv2.imwrite(f"save_pic\invade\hongxian-{pic_count}.jpg", frame) # drawn_frame = frame_copy # 关键修复:深拷贝绘制后的帧 # 图像编码 if not repeat_state: # 未触发去重逻辑,即执行图像上传逻辑 print(f"未发现重复,现在上传{str_loca}======{show_des}") def encode_origin_frame(): success, buffer = cv2.imencode(".jpg", frame) return buffer.tobytes() if success else None def encode_frame(): success, buffer = cv2.imencode(".jpg", frame_copy) # if task_id == "2a5d7a80-109a-4cd6-aa95-0e8c9aab6b3f-1": # 测试安全帽 # cv2.imwrite(f"save_pic\qm\hongxian-encode-{pic_count_hongxian}.jpg", frame) # # if task_id == "7eecadd6-001f-488c-bed9-1086079c3450-1": # 测试工地车辆 # cv2.imwrite(f"save_pic\gdcl\hongxian-encode-{pic_count_hongxian}.jpg", frame_copy) return buffer.tobytes() if success else None buffer_bytes = await loop.run_in_executor(invade_executor, encode_frame) buffer_origin_bytes = await loop.run_in_executor(invade_executor, encode_origin_frame) if not buffer_bytes: continue # 并行处理上传和MQTT发布 async def upload_and_publish(): # 上传到MinIO def upload_minio(): minio_path, file_type = upload_frame_buff_from_buffer(buffer_bytes, None) minio_origin_path, file_type = upload_frame_buff_from_buffer(buffer_origin_bytes, None) return minio_path, minio_origin_path, file_type # return upload_frame_buff_from_buffer(buffer_bytes, None) minio_path, minio_origin_path, file_type = await loop.run_in_executor( invade_executor, upload_minio ) print(f"minio_pathminio_pathminio_pathminio_path {minio_path}") # 构造消息 message = { "task_id": task_id, "time": str(datetime.now()), "detection_id": timestamp, "minio": {"minio_path": minio_path, "minio_origin_path": minio_origin_path, "file_type": file_type}, "box_detail": [{ "model_id": model_func_id, "cls_count": cls_count, "box_count": [message_point], # 特殊处理 "location_results": location_results # 增加经纬度信息 }], "osd_location": { "longitude": cam_longitude, "latitude": cam_latitude }, "des_location": des_location_result } if not event_queue.full(): await event_queue.put({ "timestamp": timestamp # 存储事件触发的时刻,用作视频制作 }) else: logger.warning("event_queue 帧队列已满,等待1ms后重试") await asyncio.sleep(0.001) print(f"hongxianhongxianhongxianhongxian上传 {message}:{event_queue.qsize()} ") message_json = json.dumps(message, ensure_ascii=False) await mqtt.publish(mqtt_publish_topic, message_json) asyncio.create_task(upload_and_publish()) # 使用共享变量时加锁,进而进行跳帧,不然上报太频繁 # async with invade_cache_lock: # # 读取或修改共享变量 # send_count = shared_local_cache["invade_send_count"] # send_count = send_count + 1 # shared_local_cache["invade_send_count"] = send_count # if send_count > 1: # # 创建独立任务执行上传和发布 # shared_local_cache["invade_send_count"] = 0 # asyncio.create_task(upload_and_publish()) except Exception as e: print(f"cal_des_invade 错误: {e}") import traceback traceback.print_exc() await asyncio.sleep(0.1) print("cal_des_invade读取线程已停止") def upload_process_main(group_id: str, task_id: str, mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, device_height: float, repeat_dis: float, repeat_time: float, group_queues: Dict): """进程B的主函数 - 优化版本(集成三个异步方法)""" pid = os.getpid() logger.info(f"[ProcessB] 启动 (PID:{pid}),任务ID: {task_id}") # 检查CPU状态 check_cpu_affinity() # 设置低优先级 try: os.nice(19) # 最低优先级 except: pass async def run_upload_tasks(): try: # 初始化MQTT服务 mqtt = MQTTService(mqtt_pub_ip, port=mqtt_pub_port) await mqtt.connect() # 初始化队列和事件 stop_event = group_queues.get('stop_event') cv_frame_queue = group_queues.get('cv_frame_queue') invade_queue = group_queues.get('invade_queue') event_queue = group_queues.get('event_queue') timestamp_frame_queue = group_queues.get('timestamp_frame_queue') # 创建线程池执行器 loop = asyncio.get_running_loop() upload_executor = ThreadPoolExecutor(max_workers=Config.UPLOAD_WORKERS) invade_executor = ThreadPoolExecutor(max_workers=Config.INVADE_WORKERS) event_video_executor = ThreadPoolExecutor(max_workers=Config.EVENT_VIDEO_WORKERS) # 创建取消事件 cancel_flag = asyncio.Event() # 创建三个主要任务 tasks = [ # 任务1: 处理侵限事件 asyncio.create_task( cal_des_invade( loop=loop, invade_executor=invade_executor, task_id=task_id, mqtt=mqtt, mqtt_publish_topic=mqtt_pub_topic, list_points=[], # 需要从外部传入 camera_para=None, # 需要从外部传入 model_count=1, # 需要根据实际情况设置 cancel_flag=cancel_flag, invade_queue=invade_queue, event_queue=event_queue, device_height=device_height, repeat_dis=repeat_dis, repeat_time=repeat_time ), name=f"{group_id}_cal_des_invade" ), # 任务2: 发送帧到S3和MQTT asyncio.create_task( send_frame_to_s3_mq( loop=loop, upload_executor=upload_executor, task_id=task_id, mqtt=mqtt, mqtt_topic=mqtt_pub_topic, cancel_flag=cancel_flag, cv_frame_queue=cv_frame_queue, event_queue=event_queue, device_height=device_height, repeat_dis=repeat_dis, repeat_time=repeat_time ), name=f"{group_id}_send_frame_to_s3_mq" ), # 任务3: 切割事件视频并发布 asyncio.create_task( cut_evnt_video_publish( task_id=task_id, mqtt=mqtt, mqtt_topic=mqtt_pub_topic, cancel_flag=cancel_flag, event_queue=event_queue, timestamp_frame_queue=timestamp_frame_queue ), name=f"{group_id}_cut_evnt_video_publish" ) ] # 监控停止事件 while not stop_event.is_set(): await asyncio.sleep(0.1) # 收到停止信号,取消所有任务 cancel_flag.set() for task in tasks: task.cancel() # 等待任务结束 await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: logger.error(f"[ProcessB] 异常: {e}") error_queue = group_queues.get('error_queue') if error_queue: error_queue.put({ 'group_id': group_id, 'process': 'ProcessB', 'error': str(e), 'time': datetime.now() }) finally: # 清理资源 if 'mqtt' in locals(): await mqtt.disconnect() if 'upload_executor' in locals(): upload_executor.shutdown(wait=True) if 'invade_executor' in locals(): invade_executor.shutdown(wait=True) if 'event_video_executor' in locals(): event_video_executor.shutdown(wait=True) try: # 运行异步上传任务 asyncio.run(run_upload_tasks()) except Exception as e: logger.error(f"[ProcessB] 主循环异常: {e}") error_queue = group_queues.get('error_queue') if error_queue: error_queue.put({ 'group_id': group_id, 'process': 'ProcessB', 'error': str(e), 'time': datetime.now() }) def start_rtmp_processing_with_groups(video_url: str, task_id: str, model_configs: List[Dict], mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str, output_rtmp_url: str, invade_enable: bool, invade_file: str, camera_para_url: str, device_height: float, repeat_dis: float, repeat_time: float, process_group_manager: ProcessGroupManager = None): """启动RTMP处理进程组""" if process_group_manager is None: process_group_manager = ProcessGroupManager() group_id = f"group_{task_id}" # 创建进程组 result = process_group_manager.create_process_group( group_id=group_id, video_url=video_url, task_id=task_id, model_configs=model_configs, mqtt_pub_ip=mqtt_pub_ip, mqtt_pub_port=mqtt_pub_port, mqtt_pub_topic=mqtt_pub_topic, mqtt_sub_ip=mqtt_sub_ip, mqtt_sub_port=mqtt_sub_port, mqtt_sub_topic=mqtt_sub_topic, output_rtmp_url=output_rtmp_url, invade_enable=invade_enable, invade_file=invade_file, camera_para_url=camera_para_url, device_height=device_height, repeat_dis=repeat_dis, repeat_time=repeat_time ) return process_group_manager, result if __name__ == '__main__': # 系统检查 logger.info("=== 高性能AI视频处理系统 ===") logger.info(f"Python版本: {sys.version}") logger.info(f"操作系统: {sys.platform}") # CPU信息 total_cpus = psutil.cpu_count(logical=False) logical_cpus = psutil.cpu_count(logical=True) logger.info(f"CPU核心: {total_cpus}物理/{logical_cpus}逻辑") # 检查隔离支持 try: with open('/proc/cmdline', 'r') as f: cmdline = f.read() if 'isolcpus=' in cmdline: logger.info("✅ 系统支持CPU隔离") else: logger.info("⚠️ 未启用isolcpus,使用软件隔离") except: pass multiprocessing.freeze_support() multiprocessing.set_start_method('spawn', force=True) process_group_manager = ProcessGroupManager() # 配置参数 model_configs = [{ 'path': r"/mnt/mydisk1/dj/ai2/pt/smoke.pt", 'engine_path': r"/mnt/mydisk1/dj/ai2/engine/smoke/smoke.engine", 'so_path': r"/mnt/mydisk1/dj/ai2/engine/smoke/libmyplugins.so", # 'path': r"D:\project\AI-PYTHON\Ai_tottle\pt\smoke.pt", # 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\smoke\smoke.engine", # 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\smoke\myplugins.dll", 'cls_map': {'hote(infrared)&fire': '火', 'smoke': '烟'}, 'allowed_classes': ['smoke', 'hote(infrared)&fire'], "cls_index": [0, 1], "class_names": ['smoke', 'hote(infrared)&fire'], "chinese_label": {0: '烟', 1: '火'}, "list_func_id": 100000, "func_id": 100041, "para_invade_enable": False, "config_conf": 0.4 }] video_url = "rtmp://222.212.85.86:1935/live/12345678" # video_url = "rtmp://192.168.0.11:1935/live/12345678" task_id = "441ba16f-757f-481f-8f73-68a885fcc229-test" mqtt_pub_ip = "47.108.62.6" mqtt_pub_port = 12503 mqtt_pub_topic = "thing/product/ai/events" mqtt_sub_ip = "47.108.62.6" mqtt_sub_port = 12503 mqtt_sub_topic = "thing/product/8UUXN6S00A0CK7TEST/drc/up" output_rtmp_url = "rtmp://222.212.85.86:1935/live/1234567890ai123" invade_enable = True invade_file = "meta_data/校友之家耕地占用demo.geojson" camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt" device_height = 600 repeat_dis = 20 repeat_time = 20 # 启动进程组 process_group_manager, group_id1 = start_rtmp_processing_with_groups( video_url, task_id, model_configs, mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic, mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic, output_rtmp_url, invade_enable, invade_file, camera_para_url, device_height, repeat_dis, repeat_time, process_group_manager ) if group_id1: logger.info(f"✅ 进程组 {group_id1} 启动成功") logger.info(f"进程A: 高速推理 (CPU 0-3)") logger.info(f"进程B: 后台上传 (CPU 4-5)") else: logger.error("❌ 进程组启动失败") sys.exit(1) try: # 主监控 monitor_count = 0 while True: time.sleep(1) monitor_count += 1 if monitor_count % 10 == 0: # 每10秒打印一次 logger.info(f"\n=== 系统监控 ({datetime.now()}) ===") # 系统CPU使用率 cpu_percent = psutil.cpu_percent(interval=1, percpu=True) # 只显示前6个CPU核心 cpu_display = [f'{p:.1f}%' for p in cpu_percent[:6]] logger.info(f"CPU 0-5 使用率: {cpu_display}") except KeyboardInterrupt: logger.info("\n=== 收到停止信号,正在停止... ===") process_group_manager.stop_all_groups(timeout=10) logger.info("=== 所有进程已停止 ===") except Exception as e: logger.error(f"主进程异常: {e}") process_group_manager.stop_all_groups(timeout=10)