This commit is contained in:
yooooger 2025-07-23 14:15:31 +08:00
parent 470e502704
commit 6d419c9369
11 changed files with 651 additions and 0 deletions

Binary file not shown.

BIN
ai_for_out/GDCL.pt Normal file

Binary file not shown.

BIN
ai_for_out/HWRC.pt Normal file

Binary file not shown.

BIN
ai_for_out/best.pt Normal file

Binary file not shown.

603
ai_for_out/cv_video.py Normal file
View File

@ -0,0 +1,603 @@
from threading import Thread, Lock, Event
import time
import queue
from ultralytics import YOLO # 导入 Ultralytics YOLO 模型
import os, cv2, torch, time, queue, subprocess
import numpy as np
# 全局变量
ifAI = {'status': False}
deskLock = Lock()
frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小
processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列
stop_event = Event()
def setIfAI(pb1):
deskLock.acquire()
ifAI['status'] = pb1
deskLock.release()
def getIfAI():
return ifAI['status']
def stopAIVideo():
print("正在停止AI视频处理...")
setIfAI(False)
stop_event.set()
# 等待足够长的时间确保资源释放
wait_count = 0
max_wait = 5 # 减少最大等待时间到5秒
while stop_event.is_set() and wait_count < max_wait:
time.sleep(0.5)
wait_count += 1
if wait_count >= max_wait:
print("警告: 停止AI视频处理超时强制终止")
# 不使用_thread._interrupt_main(),改用其他方式强制终止
try:
# 尝试终止可能运行的进程
import os
import signal
import psutil
# 查找并终止可能的FFmpeg进程
current_process = psutil.Process(os.getpid())
for child in current_process.children(recursive=True):
try:
child_name = child.name().lower()
if 'ffmpeg' in child_name:
print(f"正在终止子进程: {child.pid} ({child_name})")
child.send_signal(signal.SIGTERM)
except:
pass
except:
pass
else:
print("AI视频处理已停止")
def startAIVideo(video_path, output_url, m1, cls, confidence):
if ifAI['status']:
stopAIVideo()
time.sleep(1)
stop_event.clear()
thread = Thread(target=startAIVideo2,
args=(video_path, output_url, m1, cls, confidence))
# cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence))
# cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出
thread.daemon = True # 守护线程,主程序退出时线程也会退出
thread.start()
# cls2_thread.start()
def read_frames(cap, frame_queue):
"""优化的帧读取线程"""
frame_count = 0
last_time = time.time()
last_fps_time = time.time()
# 减小目标帧间隔时间,提高读取帧率
target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间提高到60fps
# 添加连接断开检测
connection_error_count = 0
max_connection_errors = 10 # 最多允许连续10次连接错误
last_successful_read = time.time()
max_read_wait = 30.0 # 30秒无法读取则认为连接断开
# 预先丢弃几帧,确保从新帧开始处理
for _ in range(5):
cap.grab()
while not stop_event.is_set():
current_time = time.time()
elapsed_time = current_time - last_time
# 检查是否长时间无法读取帧
if current_time - last_successful_read > max_read_wait:
print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开")
stop_event.set()
break
# 帧率控制,但更积极地读取
if elapsed_time < target_time_per_frame:
time.sleep(target_time_per_frame - elapsed_time)
continue
# 当队列快满时,跳过一些帧以避免延迟累积
if frame_queue.qsize() > frame_queue.maxsize * 0.8:
# 跳过一些帧
cap.grab()
last_time = time.time()
continue
ret, frame = cap.read()
if not ret:
print("拉流错误:无法读取帧")
connection_error_count += 1
if connection_error_count >= max_connection_errors:
print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...")
stop_event.set()
break
time.sleep(0.5) # 短暂等待后重试
continue
# 成功读取了帧,重置错误计数
connection_error_count = 0
last_successful_read = time.time()
frame_count += 1
if frame_count % 60 == 0: # 每60帧计算一次FPS
current_fps_time = time.time()
fps = 60 / (current_fps_time - last_fps_time)
print(f"拉流FPS: {fps:.2f}")
last_fps_time = current_fps_time
last_time = time.time()
frame_queue.put((frame, time.time())) # 添加时间戳
def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence):
"""处理帧的线程,添加帧率控制"""
error_count = 0 # 添加错误计数器
max_errors = 5 # 最大容许错误次数
frame_count = 0
process_times = [] # 用于计算平均处理时间
# 设置YOLO模型配置提高性能
ov_model.conf = confidence # 设置置信度阈值
# 优化推理性能
try:
# 导入torch库
import torch
# 尝试启用ONNX Runtime加速
ov_model.to('cuda:0' if torch.cuda.is_available() else 'cpu')
# 调整批处理大小为1减少内存占用
if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'):
ov_model.args.batch = 1
# 使用half精度提高性能
if torch.cuda.is_available() and hasattr(ov_model, 'model'):
try:
ov_model.model = ov_model.model.half()
except Exception as half_err:
print(f"半精度转换失败: {half_err}")
except Exception as e:
print(f"模型优化配置警告: {e}")
# 缓存先前的检测结果,用于提高稳定性
last_results = None
skip_counter = 0
max_skip = 2 # 最多跳过几帧不处理
while not stop_event.is_set():
try:
if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8:
# 如果输出队列接近满,等待一段时间
time.sleep(0.01)
continue
frame, timestamp = frame_queue.get(timeout=0.2)
# 处理延迟过大的帧
if time.time() - timestamp > 0.3: # 减少延迟阈值
continue
frame_count += 1
# 间隔采样每n帧处理一次减少计算量
if skip_counter > 0 and last_results is not None:
skip_counter -= 1
# 使用上次的检测结果
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
processed_frame_queue.put((annotated_frame, timestamp))
continue
process_start = time.time()
# 动态调整处理尺寸,根据队列积压情况
resize_scale = 1.0
if frame_queue.qsize() > frame_queue.maxsize * 0.7:
resize_scale = 0.4 # 高负载时大幅降低分辨率
elif frame_queue.qsize() > frame_queue.maxsize * 0.5:
resize_scale = 0.6 # 中等负载时适度降低分辨率
elif frame_queue.qsize() > frame_queue.maxsize * 0.3:
resize_scale = 0.8 # 轻微负载时轻微降低分辨率
# 调整图像尺寸以加快处理
if resize_scale < 1.0:
process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale)
else:
process_frame = frame
# 执行推理
try:
results = ov_model(process_frame, classes=cls, show=False)
last_results = results[0] # 保存检测结果用于后续帧
# 如果尺寸调整过,需要将结果转换回原始尺寸
if resize_scale < 1.0:
# 绘制检测框
annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5),
(frame.shape[1], frame.shape[0]))
else:
annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5)
# 在负载高时启用跳帧处理
if frame_queue.qsize() > frame_queue.maxsize * 0.5:
skip_counter = max_skip
except Exception as infer_err:
print(f"推理错误: {infer_err}")
if last_results is not None:
# 使用上次的结果
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
else:
# 如果没有上次的结果,简单返回原始帧
annotated_frame = frame.copy()
process_end = time.time()
process_times.append(process_end - process_start)
if len(process_times) > 30:
process_times.pop(0)
if frame_count % 30 == 0:
avg_process_time = sum(process_times) / len(process_times)
fps = 1.0 / avg_process_time if avg_process_time > 0 else 0
print(f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time*1000:.2f}ms, 队列大小: {frame_queue.qsize()}, 缩放比例: {resize_scale:.2f}")
processed_frame_queue.put((annotated_frame, timestamp))
error_count = 0 # 成功处理后重置错误计数
except queue.Empty:
continue
except Exception as e:
error_count += 1
print(f"处理帧错误: {e}")
if error_count >= max_errors:
print(f"连续处理错误达到{max_errors}次,正在停止处理...")
stop_event.set()
break
def write_frames(processed_frame_queue, pipe, size):
"""写入帧的线程,添加平滑处理"""
last_write_time = time.time()
target_time_per_frame = 1.0 / 30.0 # 30fps
pipe_error_count = 0 # 添加错误计数
max_pipe_errors = 3 # 最大容忍错误数
frame_count = 0
last_fps_time = time.time()
skipped_frames = 0
# 使用队列存储最近几帧,用于在需要时进行插值
recent_frames = []
max_recent_frames = 5 # 增加缓存帧数量,提高平滑性
# 使用双缓冲机制提高写入速度
buffer1 = bytearray(size[0] * size[1] * 3)
buffer2 = bytearray(size[0] * size[1] * 3)
current_buffer = buffer1
# 帧率控制参数
min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔
max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔
while not stop_event.is_set():
try:
# 获取处理后的帧,超时时间较短以便更平滑地处理
frame, timestamp = processed_frame_queue.get(timeout=0.05)
# 存储最近的帧用于插值
recent_frames.append(frame)
if len(recent_frames) > max_recent_frames:
recent_frames.pop(0)
current_time = time.time()
elapsed = current_time - last_write_time
# 如果两帧间隔太短,考虑合并或跳过
if elapsed < min_frame_interval and len(recent_frames) >= 2:
skipped_frames += 1
continue
# 如果两帧间隔太长,进行插值平滑
if elapsed > max_frame_interval and len(recent_frames) >= 2:
# 计算需要插入的帧数
frames_to_insert = min(int(elapsed / target_time_per_frame), 3)
for i in range(frames_to_insert):
# 创建插值帧
weight = (i + 1) / (frames_to_insert + 1)
interpolated_frame = cv2.addWeighted(recent_frames[-2], 1-weight, recent_frames[-1], weight, 0)
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
# 高效调整大小并写入
interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = interpolated_resized.tobytes()
# 写入管道
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
# 正常写入当前帧 - 使用高效的调整大小方法
resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = resized_frame.tobytes()
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
frame_count += 1
if frame_count % 30 == 0:
current_fps_time = time.time()
fps = 30 / (current_fps_time - last_fps_time)
print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}")
last_fps_time = current_fps_time
skipped_frames = 0
last_write_time = time.time()
pipe_error_count = 0 # 成功写入后重置错误计数
except queue.Empty:
# 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅
if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame:
try:
# 创建插值帧
interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0)
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = resized_frame.tobytes()
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
last_write_time = time.time()
except Exception:
pass
continue
except Exception as e:
print(f"写入帧错误: {e}")
pipe_error_count += 1
if pipe_error_count >= max_pipe_errors:
print("FFmpeg管道错误过多正在终止进程...")
stop_event.set() # 主动结束所有线程
break
def startAIVideo2(video_path, output_url, m1, cls, confidence):
rtmp = output_url
setIfAI(True)
cap = None
pipe = None
read_thread = None
process_thread = None
write_thread = None
ov_model = None
try:
global frame_queue, processed_frame_queue, stop_event
stop_event = Event()
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
torch.cuda.empty_cache()
print("预加载YOLO模型...")
model_params = {}
try:
test_model = YOLO(m1)
if hasattr(test_model, "task"):
model_params["task"] = "detect"
if torch.cuda.is_available():
model_params["half"] = True
import inspect
if "verbose" in inspect.signature(YOLO.__init__).parameters:
model_params["verbose"] = False
except Exception as e:
print(f"参数检测失败,将使用默认参数: {e}")
model_params = {}
retry_count = 0
while retry_count < 3:
try:
ov_model = YOLO(m1, **model_params)
dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
for _ in range(3):
ov_model(dummy_frame, classes=cls, conf=confidence, show=False)
print("YOLO模型加载成功并预热完成")
break
except Exception as e:
retry_count += 1
print(f"YOLO模型加载失败(尝试 {retry_count}/3): {e}")
if "unexpected keyword" in str(e):
param = str(e).split("'")[-2]
if param in model_params:
print(f"移除不支持的参数: {param}")
del model_params[param]
time.sleep(2)
if ov_model is None:
raise Exception("无法加载YOLO模型")
ov_model.conf = confidence
print(f"正在连接视频流: {video_path}")
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
if not cap.isOpened():
raise Exception(f"无法打开视频流: {video_path}")
try:
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)
except Exception as e:
print(f"无法设置自动曝光参数: {e}")
frame_queue = queue.Queue(maxsize=80)
processed_frame_queue = queue.Queue(maxsize=40)
size = (1920, 1080)
sizeStr = f"{size[0]}x{size[1]}"
command = [
'ffmpeg', '-y',
'-f', 'rawvideo', '-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', sizeStr,
'-r', '30',
'-i', '-',
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-x264-params', 'sei=0',
'-pix_fmt', 'yuv420p',
'-f', 'flv',
'-g', '30',
'-keyint_min', '30',
'-sc_threshold', '0',
'-b:v', '2500k',
'-maxrate', '3000k',
'-bufsize', '3000k',
'-threads', '4',
'-vsync', '1',
rtmp
]
print(f"启动FFmpeg推流到: {rtmp}")
pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
bufsize=10*1024*1024)
def monitor_ffmpeg_output(pipe):
while not stop_event.is_set():
line = pipe.stderr.readline().decode('utf-8', errors='ignore')
if line and ("error" in line.lower()):
print(f"FFmpeg错误: {line.strip()}")
if "Cannot open connection" in line:
stop_event.set()
break
Thread(target=monitor_ffmpeg_output, args=(pipe,), daemon=True).start()
def read_frames(cap, frame_queue):
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
print("读取失败")
break
timestamp = time.time()
try:
frame_queue.put((timestamp, frame), timeout=1)
except queue.Full:
print("帧队列满,跳帧")
def process_frames(frame_queue, processed_frame_queue, model, cls, confidence):
while not stop_event.is_set():
try:
timestamp, frame = frame_queue.get(timeout=1)
result = model.predict(source=frame, classes=cls, conf=confidence, verbose=False)
processed = result[0].plot()
processed_frame_queue.put((timestamp, processed), timeout=1)
except queue.Empty:
continue
except Exception as e:
print(f"处理帧错误: {e}")
def write_frames(processed_frame_queue, pipe, size):
last_timestamp = 0
while not stop_event.is_set():
try:
timestamp, frame = processed_frame_queue.get(timeout=1)
if timestamp < last_timestamp:
print(f"跳过闪回帧 {timestamp} < {last_timestamp}")
continue
last_timestamp = timestamp
frame = cv2.resize(frame, size)
pipe.stdin.write(frame.tobytes())
except Exception as e:
print(f"写入帧错误: {e}")
break
read_thread = Thread(target=read_frames, args=(cap, frame_queue), daemon=True, name="ReadThread")
process_thread = Thread(target=process_frames, args=(frame_queue, processed_frame_queue, ov_model, cls, confidence), daemon=True, name="ProcessThread")
write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size), daemon=True, name="WriteThread")
print("开始推流处理...")
read_thread.start()
process_thread.start()
write_thread.start()
last_check = time.time()
while getIfAI() and not stop_event.is_set():
if not all([t.is_alive() for t in [read_thread, process_thread, write_thread]]):
print("检测到线程停止,退出")
stop_event.set()
break
if pipe.poll() is not None:
print("FFmpeg退出")
stop_event.set()
break
if time.time() - last_check > 30:
print(f"输入队列: {frame_queue.qsize()}/{frame_queue.maxsize} | 输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}")
last_check = time.time()
time.sleep(0.1)
except Exception as e:
print(f"错误: {e}")
finally:
print("清理资源...")
stop_event.set()
setIfAI(False)
for t in [read_thread, process_thread, write_thread]:
if t and t.is_alive():
t.join(timeout=2)
try:
if cap: cap.release()
if pipe:
try:
import signal
os.kill(pipe.pid, signal.SIGTERM)
except: pass
pipe.stdin.close()
pipe.terminate()
try:
pipe.wait(timeout=2)
except:
pipe.kill()
except Exception as e:
print(f"释放资源时出错: {e}")
try:
cv2.destroyAllWindows()
except:
pass
print("资源释放完毕")
if __name__ == '__main__':
sn = "1581F6QAD243C00BP71E"
video_path = f"rtmp://222.212.85.86:1935/live/{sn}"
# FFmpeg 推流地址
rtmp = f"rtmp://222.212.85.86:1935/live/{sn}ai"
try:
startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4],0.4)
except KeyboardInterrupt:
print("程序被用户中断")
stopAIVideo()
except Exception as e:
print(f"程序异常: {e}")

BIN
ai_for_out/fire.pt Normal file

Binary file not shown.

BIN
ai_for_out/gdaq.pt Normal file

Binary file not shown.

View File

@ -0,0 +1,48 @@
import os
from minio import Minio
from minio.error import S3Error
bucket="300bdf2b-a150-406e-be63-d28bd29b409f"
# 替换为你的MinIO服务器地址、访问密钥和秘密密钥
def getClient():
minio_client = Minio(
"222.212.85.86:9000",
access_key="WuRenJi",
secure=False,
secret_key="WRJ@2024",)
return minio_client
def getPath2(object):
#dir="C:/sy/movies/"
dir=os.getcwd()+"/"
baseName=object
s1=baseName.rfind("/")
dir2=(dir+baseName[0:s1+1]).replace("/","\\")
fName=baseName[s1+1:int(len(baseName))]
os.makedirs(dir2, exist_ok=True)
file_path = os.path.join(dir2, fName)
return file_path
def upLoad(obj,path):
try:
minio_client=getClient()
minio_client.fput_object(bucket, obj, path)
return True
except S3Error as e:
return False
def downLoad(obj):
path=getPath2(obj)
if os.path.exists(path):
return path
# 从MinIO的存储桶和对象名称下载
try:
minio_client=getClient()
minio_client.fget_object(bucket, obj, path)
return path
except S3Error as e:
return ""
if __name__ == '__main__':
upLoad("aaa/yolo_api.py","yolo_api.py")

BIN
ai_for_out/smoke.pt Normal file

Binary file not shown.

BIN
ai_for_out/trash.pt Normal file

Binary file not shown.

BIN
ai_for_out/yanwu2.pt Normal file

Binary file not shown.