diff --git a/ai2/last.pt b/ai2/last.pt deleted file mode 100644 index 497e15f..0000000 Binary files a/ai2/last.pt and /dev/null differ diff --git a/ai_for_out/GDCL.pt b/ai_for_out/GDCL.pt new file mode 100644 index 0000000..85ca803 Binary files /dev/null and b/ai_for_out/GDCL.pt differ diff --git a/ai_for_out/HWRC.pt b/ai_for_out/HWRC.pt new file mode 100644 index 0000000..c24c4e1 Binary files /dev/null and b/ai_for_out/HWRC.pt differ diff --git a/ai_for_out/best.pt b/ai_for_out/best.pt new file mode 100644 index 0000000..79f35ba Binary files /dev/null and b/ai_for_out/best.pt differ diff --git a/ai_for_out/cv_video.py b/ai_for_out/cv_video.py new file mode 100644 index 0000000..7a73c05 --- /dev/null +++ b/ai_for_out/cv_video.py @@ -0,0 +1,603 @@ +from threading import Thread, Lock, Event +import time +import queue +from ultralytics import YOLO # 导入 Ultralytics YOLO 模型 +import os, cv2, torch, time, queue, subprocess +import numpy as np + +# 全局变量 +ifAI = {'status': False} +deskLock = Lock() +frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小 +processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列 +stop_event = Event() + +def setIfAI(pb1): + deskLock.acquire() + ifAI['status'] = pb1 + deskLock.release() + +def getIfAI(): + return ifAI['status'] + +def stopAIVideo(): + print("正在停止AI视频处理...") + setIfAI(False) + stop_event.set() + + # 等待足够长的时间确保资源释放 + wait_count = 0 + max_wait = 5 # 减少最大等待时间到5秒 + + while stop_event.is_set() and wait_count < max_wait: + time.sleep(0.5) + wait_count += 1 + + if wait_count >= max_wait: + print("警告: 停止AI视频处理超时,强制终止") + # 不使用_thread._interrupt_main(),改用其他方式强制终止 + try: + # 尝试终止可能运行的进程 + import os + import signal + import psutil + + # 查找并终止可能的FFmpeg进程 + current_process = psutil.Process(os.getpid()) + for child in current_process.children(recursive=True): + try: + child_name = child.name().lower() + if 'ffmpeg' in child_name: + print(f"正在终止子进程: {child.pid} ({child_name})") + child.send_signal(signal.SIGTERM) + except: + pass + except: + pass + else: + print("AI视频处理已停止") + +def startAIVideo(video_path, output_url, m1, cls, confidence): + if ifAI['status']: + stopAIVideo() + time.sleep(1) + stop_event.clear() + thread = Thread(target=startAIVideo2, + args=(video_path, output_url, m1, cls, confidence)) + # cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence)) + # cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出 + thread.daemon = True # 守护线程,主程序退出时线程也会退出 + + + thread.start() + # cls2_thread.start() + +def read_frames(cap, frame_queue): + """优化的帧读取线程""" + frame_count = 0 + last_time = time.time() + last_fps_time = time.time() + # 减小目标帧间隔时间,提高读取帧率 + target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间(提高到60fps) + + # 添加连接断开检测 + connection_error_count = 0 + max_connection_errors = 10 # 最多允许连续10次连接错误 + last_successful_read = time.time() + max_read_wait = 30.0 # 30秒无法读取则认为连接断开 + + # 预先丢弃几帧,确保从新帧开始处理 + for _ in range(5): + cap.grab() + + while not stop_event.is_set(): + current_time = time.time() + elapsed_time = current_time - last_time + + # 检查是否长时间无法读取帧 + if current_time - last_successful_read > max_read_wait: + print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开") + stop_event.set() + break + + # 帧率控制,但更积极地读取 + if elapsed_time < target_time_per_frame: + time.sleep(target_time_per_frame - elapsed_time) + continue + + # 当队列快满时,跳过一些帧以避免延迟累积 + if frame_queue.qsize() > frame_queue.maxsize * 0.8: + # 跳过一些帧 + cap.grab() + last_time = time.time() + continue + + ret, frame = cap.read() + if not ret: + print("拉流错误:无法读取帧") + connection_error_count += 1 + if connection_error_count >= max_connection_errors: + print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...") + stop_event.set() + break + time.sleep(0.5) # 短暂等待后重试 + continue + + # 成功读取了帧,重置错误计数 + connection_error_count = 0 + last_successful_read = time.time() + + frame_count += 1 + if frame_count % 60 == 0: # 每60帧计算一次FPS + current_fps_time = time.time() + fps = 60 / (current_fps_time - last_fps_time) + print(f"拉流FPS: {fps:.2f}") + last_fps_time = current_fps_time + + last_time = time.time() + frame_queue.put((frame, time.time())) # 添加时间戳 + +def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence): + """处理帧的线程,添加帧率控制""" + error_count = 0 # 添加错误计数器 + max_errors = 5 # 最大容许错误次数 + frame_count = 0 + process_times = [] # 用于计算平均处理时间 + + # 设置YOLO模型配置,提高性能 + ov_model.conf = confidence # 设置置信度阈值 + + # 优化推理性能 + try: + # 导入torch库 + import torch + # 尝试启用ONNX Runtime加速 + ov_model.to('cuda:0' if torch.cuda.is_available() else 'cpu') + # 调整批处理大小为1,减少内存占用 + if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'): + ov_model.args.batch = 1 + # 使用half精度,提高性能 + if torch.cuda.is_available() and hasattr(ov_model, 'model'): + try: + ov_model.model = ov_model.model.half() + except Exception as half_err: + print(f"半精度转换失败: {half_err}") + except Exception as e: + print(f"模型优化配置警告: {e}") + + # 缓存先前的检测结果,用于提高稳定性 + last_results = None + skip_counter = 0 + max_skip = 2 # 最多跳过几帧不处理 + + while not stop_event.is_set(): + try: + if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8: + # 如果输出队列接近满,等待一段时间 + time.sleep(0.01) + continue + + frame, timestamp = frame_queue.get(timeout=0.2) + + # 处理延迟过大的帧 + if time.time() - timestamp > 0.3: # 减少延迟阈值 + continue + + frame_count += 1 + + # 间隔采样,每n帧处理一次,减少计算量 + if skip_counter > 0 and last_results is not None: + skip_counter -= 1 + # 使用上次的检测结果 + annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) + processed_frame_queue.put((annotated_frame, timestamp)) + continue + + process_start = time.time() + + # 动态调整处理尺寸,根据队列积压情况 + resize_scale = 1.0 + if frame_queue.qsize() > frame_queue.maxsize * 0.7: + resize_scale = 0.4 # 高负载时大幅降低分辨率 + elif frame_queue.qsize() > frame_queue.maxsize * 0.5: + resize_scale = 0.6 # 中等负载时适度降低分辨率 + elif frame_queue.qsize() > frame_queue.maxsize * 0.3: + resize_scale = 0.8 # 轻微负载时轻微降低分辨率 + + # 调整图像尺寸以加快处理 + if resize_scale < 1.0: + process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale) + else: + process_frame = frame + + # 执行推理 + try: + results = ov_model(process_frame, classes=cls, show=False) + last_results = results[0] # 保存检测结果用于后续帧 + + # 如果尺寸调整过,需要将结果转换回原始尺寸 + if resize_scale < 1.0: + # 绘制检测框 + annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5), + (frame.shape[1], frame.shape[0])) + else: + annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5) + + # 在负载高时启用跳帧处理 + if frame_queue.qsize() > frame_queue.maxsize * 0.5: + skip_counter = max_skip + except Exception as infer_err: + print(f"推理错误: {infer_err}") + if last_results is not None: + # 使用上次的结果 + annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) + else: + # 如果没有上次的结果,简单返回原始帧 + annotated_frame = frame.copy() + + process_end = time.time() + process_times.append(process_end - process_start) + if len(process_times) > 30: + process_times.pop(0) + + if frame_count % 30 == 0: + avg_process_time = sum(process_times) / len(process_times) + fps = 1.0 / avg_process_time if avg_process_time > 0 else 0 + print(f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time*1000:.2f}ms, 队列大小: {frame_queue.qsize()}, 缩放比例: {resize_scale:.2f}") + + processed_frame_queue.put((annotated_frame, timestamp)) + error_count = 0 # 成功处理后重置错误计数 + except queue.Empty: + continue + except Exception as e: + error_count += 1 + print(f"处理帧错误: {e}") + if error_count >= max_errors: + print(f"连续处理错误达到{max_errors}次,正在停止处理...") + stop_event.set() + break + +def write_frames(processed_frame_queue, pipe, size): + """写入帧的线程,添加平滑处理""" + last_write_time = time.time() + target_time_per_frame = 1.0 / 30.0 # 30fps + pipe_error_count = 0 # 添加错误计数 + max_pipe_errors = 3 # 最大容忍错误数 + frame_count = 0 + last_fps_time = time.time() + skipped_frames = 0 + + # 使用队列存储最近几帧,用于在需要时进行插值 + recent_frames = [] + max_recent_frames = 5 # 增加缓存帧数量,提高平滑性 + + # 使用双缓冲机制提高写入速度 + buffer1 = bytearray(size[0] * size[1] * 3) + buffer2 = bytearray(size[0] * size[1] * 3) + current_buffer = buffer1 + + # 帧率控制参数 + min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔 + max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔 + + while not stop_event.is_set(): + try: + # 获取处理后的帧,超时时间较短以便更平滑地处理 + frame, timestamp = processed_frame_queue.get(timeout=0.05) + + # 存储最近的帧用于插值 + recent_frames.append(frame) + if len(recent_frames) > max_recent_frames: + recent_frames.pop(0) + + current_time = time.time() + elapsed = current_time - last_write_time + + # 如果两帧间隔太短,考虑合并或跳过 + if elapsed < min_frame_interval and len(recent_frames) >= 2: + skipped_frames += 1 + continue + + # 如果两帧间隔太长,进行插值平滑 + if elapsed > max_frame_interval and len(recent_frames) >= 2: + # 计算需要插入的帧数 + frames_to_insert = min(int(elapsed / target_time_per_frame), 3) + + for i in range(frames_to_insert): + # 创建插值帧 + weight = (i + 1) / (frames_to_insert + 1) + interpolated_frame = cv2.addWeighted(recent_frames[-2], 1-weight, recent_frames[-1], weight, 0) + + # 切换双缓冲 + current_buffer = buffer2 if current_buffer is buffer1 else buffer1 + + # 高效调整大小并写入 + interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) + img_bytes = interpolated_resized.tobytes() + + # 写入管道 + pipe.stdin.write(img_bytes) + pipe.stdin.flush() + + # 切换双缓冲 + current_buffer = buffer2 if current_buffer is buffer1 else buffer1 + + # 正常写入当前帧 - 使用高效的调整大小方法 + resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR) + img_bytes = resized_frame.tobytes() + pipe.stdin.write(img_bytes) + pipe.stdin.flush() + + frame_count += 1 + if frame_count % 30 == 0: + current_fps_time = time.time() + fps = 30 / (current_fps_time - last_fps_time) + print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}") + last_fps_time = current_fps_time + skipped_frames = 0 + + last_write_time = time.time() + pipe_error_count = 0 # 成功写入后重置错误计数 + + except queue.Empty: + # 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅 + if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame: + try: + # 创建插值帧 + interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0) + + # 切换双缓冲 + current_buffer = buffer2 if current_buffer is buffer1 else buffer1 + + resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) + img_bytes = resized_frame.tobytes() + pipe.stdin.write(img_bytes) + pipe.stdin.flush() + last_write_time = time.time() + except Exception: + pass + continue + except Exception as e: + print(f"写入帧错误: {e}") + pipe_error_count += 1 + if pipe_error_count >= max_pipe_errors: + print("FFmpeg管道错误过多,正在终止进程...") + stop_event.set() # 主动结束所有线程 + break + +def startAIVideo2(video_path, output_url, m1, cls, confidence): + rtmp = output_url + setIfAI(True) + + cap = None + pipe = None + read_thread = None + process_thread = None + write_thread = None + ov_model = None + + try: + + global frame_queue, processed_frame_queue, stop_event + stop_event = Event() + + os.environ["OMP_NUM_THREADS"] = "4" + os.environ["CUDA_VISIBLE_DEVICES"] = "0" + + print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}") + if torch.cuda.is_available(): + print(f"GPU: {torch.cuda.get_device_name(0)}") + torch.cuda.empty_cache() + + print("预加载YOLO模型...") + model_params = {} + try: + test_model = YOLO(m1) + if hasattr(test_model, "task"): + model_params["task"] = "detect" + if torch.cuda.is_available(): + model_params["half"] = True + import inspect + if "verbose" in inspect.signature(YOLO.__init__).parameters: + model_params["verbose"] = False + except Exception as e: + print(f"参数检测失败,将使用默认参数: {e}") + model_params = {} + + retry_count = 0 + while retry_count < 3: + try: + ov_model = YOLO(m1, **model_params) + dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8) + for _ in range(3): + ov_model(dummy_frame, classes=cls, conf=confidence, show=False) + print("YOLO模型加载成功并预热完成") + break + except Exception as e: + retry_count += 1 + print(f"YOLO模型加载失败(尝试 {retry_count}/3): {e}") + if "unexpected keyword" in str(e): + param = str(e).split("'")[-2] + if param in model_params: + print(f"移除不支持的参数: {param}") + del model_params[param] + time.sleep(2) + + if ov_model is None: + raise Exception("无法加载YOLO模型") + + ov_model.conf = confidence + + print(f"正在连接视频流: {video_path}") + cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) + + if not cap.isOpened(): + raise Exception(f"无法打开视频流: {video_path}") + + try: + cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) + except Exception as e: + print(f"无法设置自动曝光参数: {e}") + + frame_queue = queue.Queue(maxsize=80) + processed_frame_queue = queue.Queue(maxsize=40) + + size = (1920, 1080) + sizeStr = f"{size[0]}x{size[1]}" + + command = [ + 'ffmpeg', '-y', + '-f', 'rawvideo', '-vcodec', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', sizeStr, + '-r', '30', + '-i', '-', + '-c:v', 'libx264', + '-preset', 'ultrafast', + '-tune', 'zerolatency', + '-x264-params', 'sei=0', + '-pix_fmt', 'yuv420p', + '-f', 'flv', + '-g', '30', + '-keyint_min', '30', + '-sc_threshold', '0', + '-b:v', '2500k', + '-maxrate', '3000k', + '-bufsize', '3000k', + '-threads', '4', + '-vsync', '1', + rtmp + ] + + + print(f"启动FFmpeg推流到: {rtmp}") + pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + bufsize=10*1024*1024) + + def monitor_ffmpeg_output(pipe): + while not stop_event.is_set(): + line = pipe.stderr.readline().decode('utf-8', errors='ignore') + if line and ("error" in line.lower()): + print(f"FFmpeg错误: {line.strip()}") + if "Cannot open connection" in line: + stop_event.set() + break + + Thread(target=monitor_ffmpeg_output, args=(pipe,), daemon=True).start() + + def read_frames(cap, frame_queue): + while not stop_event.is_set(): + ret, frame = cap.read() + if not ret: + print("读取失败") + break + timestamp = time.time() + try: + frame_queue.put((timestamp, frame), timeout=1) + except queue.Full: + print("帧队列满,跳帧") + + def process_frames(frame_queue, processed_frame_queue, model, cls, confidence): + while not stop_event.is_set(): + try: + timestamp, frame = frame_queue.get(timeout=1) + result = model.predict(source=frame, classes=cls, conf=confidence, verbose=False) + processed = result[0].plot() + processed_frame_queue.put((timestamp, processed), timeout=1) + except queue.Empty: + continue + except Exception as e: + print(f"处理帧错误: {e}") + + def write_frames(processed_frame_queue, pipe, size): + last_timestamp = 0 + while not stop_event.is_set(): + try: + timestamp, frame = processed_frame_queue.get(timeout=1) + if timestamp < last_timestamp: + print(f"跳过闪回帧 {timestamp} < {last_timestamp}") + continue + last_timestamp = timestamp + frame = cv2.resize(frame, size) + pipe.stdin.write(frame.tobytes()) + except Exception as e: + print(f"写入帧错误: {e}") + break + + read_thread = Thread(target=read_frames, args=(cap, frame_queue), daemon=True, name="ReadThread") + process_thread = Thread(target=process_frames, args=(frame_queue, processed_frame_queue, ov_model, cls, confidence), daemon=True, name="ProcessThread") + write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size), daemon=True, name="WriteThread") + + print("开始推流处理...") + read_thread.start() + process_thread.start() + write_thread.start() + + last_check = time.time() + while getIfAI() and not stop_event.is_set(): + if not all([t.is_alive() for t in [read_thread, process_thread, write_thread]]): + print("检测到线程停止,退出") + stop_event.set() + break + if pipe.poll() is not None: + print("FFmpeg退出") + stop_event.set() + break + if time.time() - last_check > 30: + print(f"输入队列: {frame_queue.qsize()}/{frame_queue.maxsize} | 输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}") + last_check = time.time() + time.sleep(0.1) + + except Exception as e: + print(f"错误: {e}") + finally: + print("清理资源...") + stop_event.set() + setIfAI(False) + + for t in [read_thread, process_thread, write_thread]: + if t and t.is_alive(): + t.join(timeout=2) + + try: + if cap: cap.release() + if pipe: + try: + import signal + os.kill(pipe.pid, signal.SIGTERM) + except: pass + pipe.stdin.close() + pipe.terminate() + try: + pipe.wait(timeout=2) + except: + pipe.kill() + except Exception as e: + print(f"释放资源时出错: {e}") + + try: + cv2.destroyAllWindows() + except: + pass + print("资源释放完毕") + + +if __name__ == '__main__': + sn = "1581F6QAD243C00BP71E" + video_path = f"rtmp://222.212.85.86:1935/live/{sn}" + # FFmpeg 推流地址 + rtmp = f"rtmp://222.212.85.86:1935/live/{sn}ai" + + try: + startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4],0.4) + except KeyboardInterrupt: + print("程序被用户中断") + stopAIVideo() + except Exception as e: + print(f"程序异常: {e}") + diff --git a/ai_for_out/fire.pt b/ai_for_out/fire.pt new file mode 100644 index 0000000..fad2dff Binary files /dev/null and b/ai_for_out/fire.pt differ diff --git a/ai_for_out/gdaq.pt b/ai_for_out/gdaq.pt new file mode 100644 index 0000000..309fae0 Binary files /dev/null and b/ai_for_out/gdaq.pt differ diff --git a/ai_for_out/minio_helper.py b/ai_for_out/minio_helper.py new file mode 100644 index 0000000..3ec3a2b --- /dev/null +++ b/ai_for_out/minio_helper.py @@ -0,0 +1,48 @@ + +import os +from minio import Minio +from minio.error import S3Error + +bucket="300bdf2b-a150-406e-be63-d28bd29b409f" +# 替换为你的MinIO服务器地址、访问密钥和秘密密钥 +def getClient(): + minio_client = Minio( + "222.212.85.86:9000", + access_key="WuRenJi", + secure=False, + secret_key="WRJ@2024",) + return minio_client + +def getPath2(object): + #dir="C:/sy/movies/" + dir=os.getcwd()+"/" + baseName=object + s1=baseName.rfind("/") + dir2=(dir+baseName[0:s1+1]).replace("/","\\") + fName=baseName[s1+1:int(len(baseName))] + os.makedirs(dir2, exist_ok=True) + file_path = os.path.join(dir2, fName) + return file_path + +def upLoad(obj,path): + try: + minio_client=getClient() + minio_client.fput_object(bucket, obj, path) + return True + except S3Error as e: + return False + +def downLoad(obj): + path=getPath2(obj) + if os.path.exists(path): + return path +# 从MinIO的存储桶和对象名称下载 + try: + minio_client=getClient() + minio_client.fget_object(bucket, obj, path) + return path + except S3Error as e: + return "" + +if __name__ == '__main__': + upLoad("aaa/yolo_api.py","yolo_api.py") diff --git a/ai_for_out/smoke.pt b/ai_for_out/smoke.pt new file mode 100644 index 0000000..39be32c Binary files /dev/null and b/ai_for_out/smoke.pt differ diff --git a/ai_for_out/trash.pt b/ai_for_out/trash.pt new file mode 100644 index 0000000..257f1b3 Binary files /dev/null and b/ai_for_out/trash.pt differ diff --git a/ai_for_out/yanwu2.pt b/ai_for_out/yanwu2.pt new file mode 100644 index 0000000..85da7fd Binary files /dev/null and b/ai_for_out/yanwu2.pt differ