增加超限施工功能

This commit is contained in:
martin 2026-04-25 21:37:18 +08:00
parent 9720a07683
commit fc534a096e
2 changed files with 204 additions and 100 deletions

View File

@ -821,6 +821,7 @@ async def read_video_frames(task_id, mqtt, mqtt_publish_topic,
if os.path.exists(local_video_path): if os.path.exists(local_video_path):
os.remove(local_video_path) os.remove(local_video_path)
# #
# async def read_rtmp_frames( # async def read_rtmp_frames(
# loop, # loop,
@ -1002,7 +1003,6 @@ async def read_video_frames(task_id, mqtt, mqtt_publish_topic,
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}") # logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
# ------------------------------- 下述方法使用ffmpeg 拉流可以解决cv2拉流的一些问题主要是虚拟环境ffmpeg不匹配的问题。但是ffmpeg拉流慢3s左右 # ------------------------------- 下述方法使用ffmpeg 拉流可以解决cv2拉流的一些问题主要是虚拟环境ffmpeg不匹配的问题。但是ffmpeg拉流慢3s左右
# import cv2 # import cv2
@ -1393,6 +1393,7 @@ import cv2
import asyncio import asyncio
from typing import Optional from typing import Optional
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
# 使用cv2 拉流避免了ffmpeg 拉流的rtmp延时3s的问题 # 使用cv2 拉流避免了ffmpeg 拉流的rtmp延时3s的问题
# async def read_rtmp_frames( # async def read_rtmp_frames(
# loop, # loop,
@ -1624,6 +1625,7 @@ TARGET_FPS = 25
FOURCC = cv2.VideoWriter_fourcc(*'H264') FOURCC = cv2.VideoWriter_fourcc(*'H264')
MAX_CORRUPTED = 30 MAX_CORRUPTED = 30
# #
# async def read_rtmp_frames( # async def read_rtmp_frames(
# loop, # loop,
@ -1902,6 +1904,7 @@ def init_capture_with_sei_fix(video_url: str, attempt: int = 1):
print(f"拉流成功:分辨率 {width}x{height}") print(f"拉流成功:分辨率 {width}x{height}")
return cap, (width, height) return cap, (width, height)
def ensure_cv8uc3(frame): def ensure_cv8uc3(frame):
"""确保帧格式为8位3通道BGR""" """确保帧格式为8位3通道BGR"""
if frame is None or frame.size == 0: if frame is None or frame.size == 0:
@ -1939,8 +1942,6 @@ async def read_rtmp_frames(
# 创建预览队列 # 创建预览队列
preview_task = None preview_task = None
# 初始化捕获器 # 初始化捕获器
cap = None cap = None
width, height = 1280, 720 width, height = 1280, 720
@ -2108,8 +2109,6 @@ async def read_rtmp_frames(
print("RTMP流读取已停止") print("RTMP流读取已停止")
# #
# async def read_rtmp_frames_skip_sei( # async def read_rtmp_frames_skip_sei(
# loop, # loop,
@ -2344,7 +2343,6 @@ async def read_rtmp_frames(
# logger.info(f"RTMP 流已结束,累计处理帧数: {frame_count}") # logger.info(f"RTMP 流已结束,累计处理帧数: {frame_count}")
# async def process_frames(detector: MultiYOLODetector): # async def process_frames(detector: MultiYOLODetector):
# async def process_frames(detector: MultiYOLODetector_TrackId, cancel_flag: asyncio.Event, # async def process_frames(detector: MultiYOLODetector_TrackId, cancel_flag: asyncio.Event,
# frame_queue: asyncio.Queue, processed_queue: asyncio.Queue): # frame_queue: asyncio.Queue, processed_queue: asyncio.Queue):
@ -2502,7 +2500,7 @@ class TrackIDEventFilter:
async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: float = None, async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: float = None,
list_points: list[list[any]] = None, camera_para: Camera_Para = None, list_points: list[list[any]] = None, camera_para: Camera_Para = None,
invade_state: bool = False, cancel_flag: asyncio.Event = None, invade_state: bool = False, invade_switch: int = 0, cancel_flag: asyncio.Event = None,
processed_queue: asyncio.Queue = None, invade_queue: asyncio.Queue = None, processed_queue: asyncio.Queue = None, invade_queue: asyncio.Queue = None,
cv_frame_queue: asyncio.Queue = None, stream_containers: Dict[str, Any] = None): cv_frame_queue: asyncio.Queue = None, stream_containers: Dict[str, Any] = None):
# global stream_containers, count_pic # global stream_containers, count_pic
@ -2560,6 +2558,9 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
results = [] results = []
results_list = [] results_list = []
invade_switch_enable = True # 侵限施工
if invade_switch > 0:
invade_switch_enable = False # 超限施工
# 启用侵限且拿到了飞机的姿态信息,再绘制红线 # 启用侵限且拿到了飞机的姿态信息,再绘制红线
if invade_state and osd_info: if invade_state and osd_info:
gimbal_yaw = osd_info.gimbal_yaw gimbal_yaw = osd_info.gimbal_yaw
@ -2615,6 +2616,7 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
model_func_id = model_para[0]["func_id"] model_func_id = model_para[0]["func_id"]
invade_point = [] invade_point = []
message_point = [] message_point = []
invade_point_message_point=[] # 超限使能,统计侵限,方便画图
target_point = [] # 存储满足条件的图像坐标,方便后续经纬度转换 target_point = [] # 存储满足条件的图像坐标,方便后续经纬度转换
cls_count = 0 cls_count = 0
@ -2649,13 +2651,16 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
is_invade = is_point_in_polygonlist(point_x, point_y, results_list) is_invade = is_point_in_polygonlist(point_x, point_y, results_list)
# is_invade = is_point_in_polygon(point_x, point_y, results) # is_invade = is_point_in_polygon(point_x, point_y, results)
# print(f"is_invadeis_invadeis_invade {is_invade} {len(results)}") # print(f"is_invadeis_invadeis_invade {is_invade} {len(results)}")
if is_invade:
if invade_switch_enable:#只关注侵限
if is_invade: #只关注侵限且实际发生侵限
# if invade_switch_enable: # 只关注侵限
cls_count += 1 cls_count += 1
invade_point.append({ # invade_point.append({
"u": point_x, # "u": point_x,
"v": point_y, # "v": point_y,
"class_name": class_name # "class_name": class_name
}) # })
target_point.append({ target_point.append({
"u": point_x, "u": point_x,
"v": point_y, "v": point_y,
@ -2682,6 +2687,88 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
text_x = x1 text_x = x1
text_y = y1 - 5 text_y = y1 - 5
# 如果文本超出图像顶部,则放在框内部下方
if text_y < 0:
text_y = y2 + text_height + 5
temp_img = frame_copy.copy()
frame_copy = put_chinese_text(
temp_img,
# label, # 置信度、类别、用作测试
# "", # 注释掉汉字
label_name, # 仅显示汉字
(text_x, text_y - 40),
)
else: #只关注超限
if is_invade: #只关注超限的情况下,发生了侵限行为,只在图像上展示侵限,不做行为记录
# if invade_switch_enable: # 只关注侵限
# cls_count += 1
# target_point.append({
# "u": point_x,
# "v": point_y,
# "cls_id": cls_id,
# "track_id": track_id,
# "new_track_id": new_track_id
# }) # 对于侵限,只存储侵限目标
# model_list_func_id = model_para[0]["model_list_func_id"]
# model_func_id = model_para[0]["func_id"]
invade_point_message_point.append({
"confidence": float(confidence),
"cls_id": cls_id,
"type_name": en_name,
"track_id": track_id,
"box": [x1, y1, x2, y2]
})
label = f"{en_name}:{confidence:.2f}:{track_id}"
label_name = f"{en_name}"
# 计算文本位置
text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[
0]
text_width, text_height = text_size[0], text_size[1]
text_x = x1
text_y = y1 - 5
# 如果文本超出图像顶部,则放在框内部下方
if text_y < 0:
text_y = y2 + text_height + 5
temp_img = frame_copy.copy()
frame_copy = put_chinese_text(
temp_img,
# label, # 置信度、类别、用作测试
# "", # 注释掉汉字
label_name, # 仅显示汉字
(text_x, text_y - 40),
)
else: # 超限使能且识别到了超限发生
print("超限使能且识别到了超限发生")
cls_count += 1
target_point.append({
"u": point_x,
"v": point_y,
"cls_id": cls_id,
"track_id": track_id,
"new_track_id": new_track_id
}) # 对于侵限,只存储侵限目标
# model_list_func_id = model_para[0]["model_list_func_id"]
# model_func_id = model_para[0]["func_id"]
message_point.append({
"confidence": float(confidence),
"cls_id": cls_id,
"type_name": en_name,
"track_id": track_id,
"box": [x1, y1, x2, y2]
})
label = f"{en_name}:{confidence:.2f}:{track_id}"
label_name = f"{en_name}"
# 计算文本位置
text_size = \
cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[
0]
text_width, text_height = text_size[0], text_size[1]
text_x = x1
text_y = y1 - 5
# 如果文本超出图像顶部,则放在框内部下方 # 如果文本超出图像顶部,则放在框内部下方
if text_y < 0: if text_y < 0:
text_y = y2 + text_height + 5 text_y = y2 + text_height + 5
@ -2734,12 +2821,18 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
label_name, # 仅显示汉字 label_name, # 仅显示汉字
(text_x, text_y - 40), (text_x, text_y - 40),
) )
# 超限画全图且只统计超限,当前为画全图
if invade_state: if invade_state:
for point in message_point: for point in message_point:
cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]), cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
(point["box"][2], point["box"][3]), (point["box"][2], point["box"][3]),
(0, 255, 255), 2) (0, 255, 255), 2)
if not invade_switch_enable:# 侵限使能,只关注超限的情况下,将侵限画另一个颜色
for point in invade_point_message_point:
cv2.rectangle(frame_copy, (point["box"][0], point["box"][1]),
(point["box"][2], point["box"][3]),
(0, 0, 255), 2)
# 画红线 # 画红线
# 在左上角显示统计结果 # 在左上角显示统计结果
stats_text = [] stats_text = []
@ -3023,7 +3116,7 @@ def haversine(lon1, lat1, lon2, lat2):
async def cal_des_invade(loop, invade_executor, task_id: str, mqtt, mqtt_publish_topic, async def cal_des_invade(loop, invade_executor, task_id: str, mqtt, mqtt_publish_topic,
list_points: list[list[any]], camera_para: Camera_Para, model_count: int, list_points: list[list[any]], camera_para: Camera_Para, model_count: int,
cancel_flag: asyncio.Event = None, invade_queue: asyncio.Queue = None, cancel_flag: asyncio.Event = None, invade_switch: int = 0, invade_queue: asyncio.Queue = None,
event_queue: asyncio.Queue = None, event_queue: asyncio.Queue = None,
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1): device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1):
# loop = asyncio.get_running_loop() # loop = asyncio.get_running_loop()
@ -3236,6 +3329,7 @@ async def cal_des_invade(loop,invade_executor, task_id: str, mqtt, mqtt_publish_
"minio": {"minio_path": minio_path, "minio": {"minio_path": minio_path,
"minio_origin_path": minio_origin_path, "minio_origin_path": minio_origin_path,
"file_type": file_type}, "file_type": file_type},
"invade_switch":invade_switch,
"box_detail": [{ "box_detail": [{
"model_id": model_func_id, "model_id": model_func_id,
"cls_count": cls_count, "cls_count": cls_count,
@ -3299,7 +3393,8 @@ invade_cache_lock = Lock() # 用于保护共享变量的锁
async def send_frame_to_s3_mq(loop, upload_executor, task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event, async def send_frame_to_s3_mq(loop, upload_executor, task_id, mqtt, mqtt_topic, cancel_flag: asyncio.Event,
cv_frame_queue: asyncio.Queue, cv_frame_queue: asyncio.Queue,
event_queue: asyncio.Queue = None, event_queue: asyncio.Queue = None,
device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1,high_count_warn: float = -1): device_height: float = float(200), repeat_dis: float = -1, repeat_time: float = -1,
high_count_warn: float = -1):
global stats global stats
start_time = time.time() start_time = time.time()
# executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) # executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
@ -3412,7 +3507,6 @@ async def send_frame_to_s3_mq(loop,upload_executor,task_id, mqtt, mqtt_topic, ca
local_key_id_list.append(cls_id) local_key_id_list.append(cls_id)
local_key_count_list.append(1) local_key_count_list.append(1)
# should_report = True # should_report = True
# # 如果这个track_id已经上报过检查是否超过上报间隔 # # 如果这个track_id已经上报过检查是否超过上报间隔
@ -3475,7 +3569,6 @@ async def send_frame_to_s3_mq(loop,upload_executor,task_id, mqtt, mqtt_topic, ca
des1_latitude = des1[1] des1_latitude = des1[1]
des1_height = des1[2] des1_height = des1[2]
str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}" str_loca = f"{des1_back_longitude}:{des1_back_latitude}---{des1_longitude}{des1_latitude}"
des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude) des = haversine(des1_back_longitude, des1_back_latitude, des1_longitude, des1_latitude)
show_des = des show_des = des
@ -3568,6 +3661,7 @@ async def send_frame_to_s3_mq(loop,upload_executor,task_id, mqtt, mqtt_topic, ca
"longitude": cam_longitude, "longitude": cam_longitude,
"latitude": cam_latitude "latitude": cam_latitude
}, },
"invade_switch": 0, #默认
"count_message": count_message, "count_message": count_message,
"high_count_warn": { "high_count_warn": {
"high_count_warn_status": high_count_warn_status, "high_count_warn_status": high_count_warn_status,
@ -4001,7 +4095,7 @@ async def start_rtmp_processing(video_url: str, task_id: str, model_configs: Lis
mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str, mqtt_pub_ip: str, mqtt_pub_port: int, mqtt_pub_topic: str,
mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str, mqtt_sub_ip: str, mqtt_sub_port: int, mqtt_sub_topic: str,
output_rtmp_url: str, output_rtmp_url: str,
invade_enable: bool, invade_file: str, camera_para_url: str, invade_enable: bool, invade_switch: int, invade_file: str, camera_para_url: str,
device_height: float, repeat_dis: float, repeat_time: float, high_count_warn: float): device_height: float, repeat_dis: float, repeat_time: float, high_count_warn: float):
# 初始化资源 # 初始化资源
# await initialize_resources() # await initialize_resources()
@ -4128,6 +4222,7 @@ async def start_rtmp_processing(video_url: str, task_id: str, model_configs: Lis
list_points, list_points,
camera_para, camera_para,
invade_state, invade_state,
invade_switch,
cancel_flag, cancel_flag,
processed_queue, processed_queue,
invade_queue, invade_queue,
@ -4152,6 +4247,7 @@ async def start_rtmp_processing(video_url: str, task_id: str, model_configs: Lis
camera_para, camera_para,
model_count, model_count,
cancel_flag, cancel_flag,
invade_switch,
invade_queue, invade_queue,
event_queue, event_queue,
device_height, device_height,
@ -4271,7 +4367,8 @@ async def start_rtmp_processing(video_url: str, task_id: str, model_configs: Lis
async def start_video_processing(minio_path: str, task_id: str, model_configs: List[Dict], async def start_video_processing(minio_path: str, task_id: str, model_configs: List[Dict],
mqtt_ip: str, mqtt_port: int, mqtt_topic: str, output_rtmp_url: str, mqtt_ip: str, mqtt_port: int, mqtt_topic: str, output_rtmp_url: str,
invade_enable: bool, invade_file: str, camera_para_url: str, device_height: float, invade_enable: bool, invade_switch: int, invade_file: str, camera_para_url: str,
device_height: float,
repeat_dis: float, repeat_time: float): repeat_dis: float, repeat_time: float):
# global stop_event, frame_queue, processed_queue, executor, upload_executor # global stop_event, frame_queue, processed_queue, executor, upload_executor
# await initialize_resources() # 初始化资源 # await initialize_resources() # 初始化资源
@ -4424,6 +4521,7 @@ async def start_video_processing(minio_path: str, task_id: str, model_configs: L
list_points, list_points,
camera_para, camera_para,
invade_state, invade_state,
invade_switch,
cancel_flag, cancel_flag,
processed_queue, processed_queue,
invade_queue, invade_queue,
@ -4450,6 +4548,7 @@ async def start_video_processing(minio_path: str, task_id: str, model_configs: L
camera_para, camera_para,
model_count, model_count,
cancel_flag, cancel_flag,
invade_switch,
invade_queue, invade_queue,
event_queue, event_queue,
device_height, device_height,

View File

@ -718,6 +718,9 @@ async def run_back_Multi_Detect_async(request, request_json, stop_event: asyncio
invade = request_json.content_body.invade invade = request_json.content_body.invade
invade_file = invade["invade_file"] invade_file = invade["invade_file"]
camera_para_url = invade["camera_para_url"] camera_para_url = invade["camera_para_url"]
invade_switch = 0
if invade["invade_switch"] is not None:
invade_switch = invade["invade_switch"]
# dao.get_mqtt_config_by_orgcode(org_code,) # dao.get_mqtt_config_by_orgcode(org_code,)
str_request = str(request) + "&" + str(request.socket) # 待测试看看公网能不能捕获到请求端ip str_request = str(request) + "&" + str(request.socket) # 待测试看看公网能不能捕获到请求端ip
dao.insert_request_log(task_id, sn, org_code, str(request.body), str_request) dao.insert_request_log(task_id, sn, org_code, str(request.body), str_request)
@ -768,7 +771,7 @@ async def run_back_Multi_Detect_async(request, request_json, stop_event: asyncio
mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic, mqtt_pub_ip, mqtt_pub_port, mqtt_pub_topic,
mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic, mqtt_sub_ip, mqtt_sub_port, mqtt_sub_topic,
push_url, push_url,
invade_enable, invade_file, camera_para_url, invade_enable,invade_switch, invade_file, camera_para_url,
device_height, repeat_dis, repeat_time,high_count_warn device_height, repeat_dis, repeat_time,high_count_warn
) )
except Exception as e: except Exception as e:
@ -951,10 +954,12 @@ async def run_back_Video_Multi_Detect_async(request, request_json):
invade = request_json.content_body.invade invade = request_json.content_body.invade
invade_file = invade["invade_file"] invade_file = invade["invade_file"]
camera_para_url = invade["camera_para_url"] camera_para_url = invade["camera_para_url"]
invade_switch = 0
if invade["invade_switch"] is not None:
invade_switch = invade["invade_switch"]
await start_video_processing(minio_file_path, task_id, model_configs, mqtt_pub_ip, mqtt_pub_port, await start_video_processing(minio_file_path, task_id, model_configs, mqtt_pub_ip, mqtt_pub_port,
mqtt_pub_topic, push_url, mqtt_pub_topic, push_url,
invade_enable, invade_file, camera_para_url, device_height, repeat_dis, invade_enable,invade_switch, invade_file, camera_para_url, device_height, repeat_dis,
repeat_time) repeat_time)
except Exception as e: except Exception as e: