246 lines
9.7 KiB
Python
246 lines
9.7 KiB
Python
# func命名规范,以func开头,以postgres 中ai_model_list表的id 结尾。针对识别结果做二次功能计算
|
||
import numpy as np
|
||
|
||
|
||
# 人员聚集或者车辆聚集的计算方法,中心点互相间的距离小于最小对角线2倍,即为聚集
|
||
# coordinate_boxes 就是坐标集合
|
||
# 需要注意coordinate_boxes 的坐标集合,分别是 x1,y1,x2,y2
|
||
def detect_crowd(coordinate_boxes, N=3, threshold_factor=2.0):
|
||
# 计算中心点和对角线长度
|
||
centers = []
|
||
diagonals = []
|
||
|
||
for box in coordinate_boxes:
|
||
x1, y1, x2, y2 = box
|
||
cx = (x1 + x2) / 2
|
||
cy = (y1 + y2) / 2
|
||
centers.append((cx, cy))
|
||
width = x2 - x1
|
||
height = y2 - y1
|
||
diagonal = np.sqrt(width ** 2 + height ** 2)
|
||
diagonals.append(diagonal)
|
||
|
||
# 构建邻接矩阵
|
||
n = len(centers)
|
||
adjacency_matrix = np.zeros((n, n), dtype=bool)
|
||
for i in range(n):
|
||
for j in range(i + 1, n):
|
||
dist = np.sqrt((centers[i][0] - centers[j][0]) ** 2 + (centers[i][1] - centers[j][1]) ** 2)
|
||
threshold = threshold_factor * min(diagonals[i], diagonals[j])
|
||
if dist < threshold:
|
||
adjacency_matrix[i][j] = True
|
||
adjacency_matrix[j][i] = True
|
||
|
||
# 查找聚集群组
|
||
visited = [False] * n
|
||
crowd_groups = []
|
||
for i in range(n):
|
||
if not visited[i]:
|
||
stack = [i]
|
||
group = []
|
||
while stack:
|
||
person = stack.pop()
|
||
if not visited[person]:
|
||
visited[person] = True
|
||
group.append(person)
|
||
for neighbor in range(n):
|
||
if adjacency_matrix[person][neighbor] and not visited[neighbor]:
|
||
stack.append(neighbor)
|
||
if len(group) >= N:
|
||
crowd_groups.append(group)
|
||
|
||
return len(crowd_groups) > 0, crowd_groups
|
||
|
||
|
||
# # 示例测试 前两个点事左上角的x、y,后两个点是右下角x、y
|
||
# person_boxes = [
|
||
# [100, 100, 150, 200], # 人1
|
||
# [110, 220, 190, 420], # 人2(与人1聚集)
|
||
# #
|
||
# # [150, 220, 210, 420], # 人2(与人1聚集)
|
||
# # [190, 220, 230, 420], # 人2(与人1聚集)
|
||
# # [230, 220, 250, 420], # 人2(与人1聚集)
|
||
# [400, 300, 350, 400], # 人3(孤立)
|
||
# ]
|
||
# has_crowd, groups = detect_crowd(person_boxes, N=3)
|
||
|
||
|
||
# 通用方法,发现即记录事件
|
||
def func_100000(results, cls_id_list, type_name_list, func_id_10001, list_track_id,func_id=-1):
|
||
box_count = []
|
||
draw_detail = []
|
||
cls_count = 0
|
||
result_boxes = results.boxes
|
||
if result_boxes is not None:
|
||
|
||
# result_boxes= results.boxes
|
||
|
||
# yolo11 的bug,检测到结果就是results.boxes 是 torch.Tensor(包含边界框坐标);当没有检测到目标时,results.boxes 可能被设置为空列表 [],导致类型变为 list。
|
||
# yolo11 使用二次修改后的值,本身就是list
|
||
if isinstance(result_boxes, list) and len(result_boxes) == 0:
|
||
return None
|
||
if isinstance(result_boxes, list) and len(result_boxes) >0:
|
||
boxes = result_boxes
|
||
else:
|
||
boxes = result_boxes.tolist()
|
||
|
||
for i, box in enumerate(boxes):
|
||
# print(boxes[i])
|
||
# print(results.confs[i])
|
||
# print(results.clss[i])
|
||
if results.clss[i] in cls_id_list:
|
||
cls_count = cls_count + 1
|
||
ind = int(results.clss[i])
|
||
trickier_detail = {
|
||
# "track_id": results.track_ids[i],
|
||
"confidence": results.confs[i],
|
||
"cls_id": i,
|
||
"type_name": type_name_list[ind],
|
||
"box": boxes[i]
|
||
}
|
||
draw_detail.append(trickier_detail)
|
||
if len(draw_detail) > 0:
|
||
box_count.append(draw_detail)
|
||
|
||
if func_id>0:
|
||
func_id_10001=func_id
|
||
|
||
if len(box_count) > 0:
|
||
cal_result = {
|
||
"model_id": func_id_10001,
|
||
"type_name": type_name_list,
|
||
"cls_count": cls_count,
|
||
"box_count": box_count
|
||
}
|
||
# message_json = json.dumps(cal_result, indent=4)
|
||
return cal_result
|
||
else:
|
||
return None
|
||
|
||
|
||
# func_10004 、 func_100005,人员、车辆、视频、图片 使用同一套逻辑,方法可以复用
|
||
# 逻辑为 中心点互相间的距离小于对角线2倍,即判断为聚集
|
||
# local_cache 为一个包含track_id的list,有变化就更新
|
||
def func_100004(results, cls_id_list, type_name_list, func_id_10004, N, local_cache):
|
||
# try:
|
||
box_count = []
|
||
draw_detail = []
|
||
cls_count = 0
|
||
coordinate_boxes = [] # 所有的 box的坐标集合
|
||
coordinate_track_id = [] # 所有的track_id
|
||
# crowd_coordinate_list=[] # 识别后的聚集的坐标的集合
|
||
# crowd_track_id_list=[] #识别后的track_id的集合
|
||
l_cache = [] # track_id的集合
|
||
if results.boxes is not None:
|
||
result_boxes = results.boxes
|
||
# yolo11 的bug,检测到结果就是results.boxes 是 torch.Tensor(包含边界框坐标);当没有检测到目标时,results.boxes 可能被设置为空列表 [],导致类型变为 list。
|
||
if isinstance(result_boxes, list) or len(result_boxes) == 0:
|
||
return None,None
|
||
boxes = result_boxes.tolist()
|
||
# if boxes is None:
|
||
# print("boxes is None")
|
||
|
||
for i, box in enumerate(boxes):
|
||
ind = int(results.clss[i])
|
||
if ind in cls_id_list:
|
||
cls_count = cls_count + 1
|
||
# x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
|
||
x1 = box[0]
|
||
y1 = box[1]
|
||
x2 = box[2]
|
||
y2 = box[3]
|
||
track_id = results.track_ids[i]
|
||
l_cache.append(track_id)
|
||
trickier_detail = {
|
||
"track_id": track_id,
|
||
"confidence": results.confs[i],
|
||
"cls_id": ind,
|
||
"type_name": type_name_list[ind], # 伪代码,后续需要修改
|
||
"box": boxes[i]
|
||
}
|
||
coordinate_boxes.append([x1, y1, x2, y2])
|
||
coordinate_track_id.append(track_id)
|
||
draw_detail.append(trickier_detail)
|
||
if len(draw_detail) > 0:
|
||
box_count.append(draw_detail)
|
||
if l_cache is not None and local_cache is not None and set(l_cache) == set(local_cache):
|
||
# if set(l_cache) == set(local_cache): # 判断元素是否相等,如果相等,就表示人员、车辆没变化
|
||
return None, None
|
||
if len(box_count) > 0:
|
||
cal_result = {
|
||
"model_id": func_id_10004,
|
||
"type_name": type_name_list,
|
||
"cls_count": cls_count,
|
||
"box_count": box_count
|
||
}
|
||
|
||
return cal_result, local_cache
|
||
else:
|
||
return None, local_cache
|
||
|
||
|
||
# except Exception as infer_err:
|
||
# print(f"推理错误1111: {infer_err}")
|
||
|
||
|
||
# func_10004 、 func_100005,人员、车辆、视频、图片 使用同一套逻辑,方法可以复用
|
||
# 逻辑为 中心点互相间的距离小于对角线2倍,即判断为聚集
|
||
# local_cache 为一个包含track_id的list,有变化就更新
|
||
def func_100021(results, cls_id_list, type_name_list, func_id_10004, N):
|
||
box_count = []
|
||
draw_detail = []
|
||
cls_count = 0
|
||
coordinate_boxes = [] # 所有的 box的坐标集合
|
||
coordinate_track_id = [] # 所有的track_id
|
||
crowd_coordinate_list = [] # 识别后的聚集的坐标的集合
|
||
crowd_track_id_list = [] # 识别后的track_id的集合
|
||
if results.boxes is not None:
|
||
result_boxes = results.boxes
|
||
# yolo11 的bug,检测到结果就是results.boxes 是 torch.Tensor(包含边界框坐标);当没有检测到目标时,results.boxes 可能被设置为空列表 [],导致类型变为 list。
|
||
if isinstance(result_boxes, list) or len(result_boxes) == 0:
|
||
return None
|
||
boxes = result_boxes.tolist()
|
||
for i, box in enumerate(boxes):
|
||
ind = int(results.clss[i])
|
||
if ind in cls_id_list:
|
||
cls_count = cls_count + 1
|
||
# x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
|
||
x1 = box[0]
|
||
y1 = box[1]
|
||
x2 = box[2]
|
||
y2 = box[3]
|
||
track_id = results.track_ids[i]
|
||
trickier_detail = {
|
||
"track_id": track_id,
|
||
"confidence": results.confs[i],
|
||
"cls_id": ind,
|
||
"type_name": type_name_list[ind], # 伪代码,后续需要修改
|
||
"box": boxes[i]
|
||
}
|
||
coordinate_boxes.append([x1, y1, x2, y2])
|
||
coordinate_track_id.append(track_id)
|
||
draw_detail.append(trickier_detail)
|
||
if len(draw_detail) > 0:
|
||
box_count.append(draw_detail)
|
||
if len(coordinate_boxes) > 0:
|
||
detect_result, detect_list = detect_crowd(coordinate_boxes, N)
|
||
if detect_result:
|
||
if detect_list is not None:
|
||
for i, value in enumerate(detect_list):
|
||
crowd_coordinate_list.append(coordinate_boxes[i])
|
||
crowd_track_id_list.append(coordinate_track_id[i])
|
||
if len(box_count) > 0:
|
||
cal_result = {
|
||
"model_id": func_id_10004,
|
||
"type_name": type_name_list,
|
||
"cls_count": cls_count,
|
||
"box_count": box_count
|
||
}
|
||
if len(crowd_coordinate_list) > 0:
|
||
cal_result["crowd_coordinate_list"] = crowd_coordinate_list
|
||
cal_result["crowd_track_id_list"] = crowd_track_id_list
|
||
# message_json = json.dumps(cal_result, indent=4)
|
||
return cal_result
|
||
else:
|
||
return None
|