ai_project_v1/mqtt_pub.py

289 lines
9.6 KiB
Python
Raw Permalink Normal View History

import json
import paho.mqtt.client as mqtt
import threading
import queue
import time
# MQTT 代理地址和端口
broker = "112.44.103.230" # 公共 MQTT 代理(免费)
port = 1883 # MQTT 默认端口
# 主题
topic = "test/topic"
# 创建消息队列
message_queue = queue.Queue()
# 全局消息队列(可选,如果多个客户端需要共享消息)
global_message_queue = queue.Queue()
class MQTTClient:
def __init__(self, broker, port, topic, use_global_queue=False):
self.broker = broker
self.port = port
self.topic = topic
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message # 绑定消息回调
self.use_global_queue = use_global_queue # 是否使用全局队列
self.local_mess=None
self.local_message_queue = queue.Queue() if not use_global_queue else None # 本地队列
self.client.connect(broker, port)
self.client.loop_start() # 启动网络线程
def on_connect(self, client, userdata, flags, rc):
"""连接成功时自动订阅主题"""
if rc == 0:
print(f"[MQTT] 成功连接到代理 {self.broker}:{self.port}")
self.client.subscribe(self.topic)
else:
print(f"[MQTT] 连接失败,错误代码: {rc}")
def on_message(self, client, userdata, msg):
"""收到消息时的回调"""
try:
payload = msg.payload.decode("utf-8")
# message_data = {
# "topic": msg.topic,
# "payload": payload,
# "qos": msg.qos,
# "retain": msg.retain,
# "ts": int(time.time() * 1000) # 毫秒级时间戳
# }
# data = json.loads(payload) # 解析为字典
message_data=json.loads(payload) # 解析为字典
# if self.use_global_queue:
# global_message_queue.put(message_data) # 存入全局队列
# else:
# self.local_message_queue.put(message_data) # 存入本地队列
# 增加格式验证,非法格式不做录入
if (message_data is not None and
message_data["data"]["attitude_head"] is not None): # DRC-高频 osd 信息上报 格式验证
self.local_mess=message_data
# print(f"[MQTT 接收] 主题: {msg.topic}, 消息: {payload}")
except Exception as e:
print(f"[MQTT 错误] 消息处理失败: {e}")
def publish_message(self, message):
"""发布消息"""
self.client.publish(self.topic, message)
print(f"[MQTT 发送] 主题: {self.topic}, 消息: {message}")
def get_messages(self, timeout=1):
"""
获取消息支持超时
- 如果使用全局队列 global_message_queue 获取
- 否则从本地队列获取
"""
messages = None
# target_queue = global_message_queue if self.use_global_queue else self.local_message_queue
start_time = time.time()
while time.time() - start_time < timeout:
try:
# msg = target_queue.get(block=True, timeout=0.1)
if self.local_mess:
messages = self.local_mess
# messages.append(msg)
self.local_mess=None
except queue.Empty:
break
return messages
def close(self):
"""关闭连接"""
self.client.loop_stop()
self.client.disconnect()
print("[MQTT] 已断开连接")
# 生产者线程:将消息放入队列
def producer_thread():
# message_queue.put(mess)
for i in range(100):
message = f"Message {i}"
message_queue.put(message)
print(f"生产者线程:已将消息放入队列: {message}")
# time.sleep(1) # 模拟生产间隔
# 消费者线程:从队列中取出消息并发布
def consumer_thread(mqtt_client):
print("consumer_thread")
# while True:
# try:
# message = message_queue.get(timeout=1) # 设置超时以避免无限阻塞
# mqtt_client.publish_message(message)
# message_queue.task_done() # 标记任务完成
# except queue.Empty:
# print("消费者线程:队列为空,等待中...")
# time.sleep(3) # 避免频繁检查队列
if __name__ == "__main__":
# 初始化 MQTT 客户端
mqtt_client = MQTTClient(broker, port, topic)
# # 启动生产者线程
# producer = threading.Thread(target=producer_thread)
# producer.start()
# 启动消费者线程
consumer = threading.Thread(target=consumer_thread, args=(mqtt_client,))
consumer.start()
# # 等待生产者线程完成
# producer.join()
# 等待队列中的所有消息被处理
# message_queue.join()
while True:
mess=mqtt_client.get_new_message()
print(mess)
# 关闭 MQTT 客户端
mqtt_client.close()
time.sleep(5)
print("程序结束")
#
# import json
# import paho.mqtt.client as mqtt
# import threading
# import queue
# import time
# import logging
#
# # 配置日志
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logger = logging.getLogger(__name__)
#
#
# class MQTTClient:
# def __init__(self, broker, port, topic, use_global_queue=False):
# self.broker = broker
# self.port = port
# self.topic = topic
# self.client = mqtt.Client()
# self.client.on_connect = self.on_connect
# self.client.on_message = self.on_message
# self.use_global_queue = use_global_queue
# self.local_message = None
# self.local_message_queue = queue.Queue() if not use_global_queue else None
# self._is_running = False
# self._thread = None
# self._lock = threading.Lock()
#
# def on_connect(self, client, userdata, flags, rc):
# """连接成功时自动订阅主题"""
# if rc == 0:
# logger.info(f"成功连接到代理 {self.broker}:{self.port}")
# client.subscribe(self.topic)
# else:
# logger.error(f"连接失败,错误代码: {rc}")
#
# def on_message(self, client, userdata, msg):
# """收到消息时的回调"""
# try:
# payload = msg.payload.decode("utf-8")
# message_data = json.loads(payload)
#
# # 增加格式验证
# if (message_data and
# isinstance(message_data.get("data"), dict) and
# "attitude_head" in message_data["data"]):
# with self._lock:
# self.local_message = message_data
# if not self.use_global_queue and self.local_message_queue:
# self.local_message_queue.put(message_data)
# except Exception as e:
# logger.error(f"消息处理失败: {e}")
#
# def publish_message(self, topic, message):
# """发布消息到指定主题"""
# self.client.publish(topic, message)
# logger.info(f"发送消息 - 主题: {topic}, 内容: {message}")
#
# def get_messages(self, timeout=1):
# """获取消息(支持超时)"""
# messages = None
# start_time = time.time()
#
# while time.time() - start_time < timeout:
# with self._lock:
# if self.local_message:
# messages = self.local_message
# self.local_message = None
# return messages
# time.sleep(0.1) # 避免CPU空转
# return None
#
# def loop_start(self):
# """启动独立的后台线程来处理MQTT网络通信"""
# if not self._is_running:
# self._is_running = True
# self._thread = threading.Thread(target=self._run_loop, daemon=True)
# self._thread.start()
# logger.info("MQTT后台线程已启动")
#
# def loop_stop(self):
# """停止后台线程"""
# if self._is_running:
# self._is_running = False
# if self._thread:
# self._thread.join()
# self.client.loop_stop()
# logger.info("MQTT后台线程已停止")
#
# def _run_loop(self):
# """后台线程运行的主循环"""
# try:
# self.client.connect(self.broker, self.port)
# self.client.loop_start() # 启动paho的内部线程
#
# while self._is_running:
# time.sleep(0.1) # 保持线程运行
# except Exception as e:
# logger.error(f"MQTT线程异常: {e}")
# finally:
# self.client.loop_stop()
#
# def close(self):
# """关闭连接"""
# self.loop_stop()
# self.client.disconnect()
# logger.info("MQTT连接已关闭")
#
# def __del__(self):
# """析构时自动清理"""
# self.close()
#
#
# # 测试代码
# if __name__ == "__main__":
# try:
# # 初始化MQTT客户端
# broker = "112.44.103.230"
# port = 1883
# topic = "test/topic"
#
# mqtt_client = MQTTClient(broker, port, topic)
# mqtt_client.loop_start() # 启动后台线程
#
# # 模拟持续接收消息
# while True:
# message = mqtt_client.get_messages(timeout=5)
# if message:
# print(f"收到消息: {message}")
# else:
# print("等待消息...")
#
# except KeyboardInterrupt:
# print("\n正在关闭...")
# finally:
# mqtt_client.close()
# time.sleep(1)
# print("程序结束")
#