import json import paho.mqtt.client as mqtt import threading import queue import time # MQTT 代理地址和端口 broker = "112.44.103.230" # 公共 MQTT 代理(免费) port = 1883 # MQTT 默认端口 # 主题 topic = "test/topic" # 创建消息队列 message_queue = queue.Queue() # 全局消息队列(可选,如果多个客户端需要共享消息) global_message_queue = queue.Queue() class MQTTClient: def __init__(self, broker, port, topic, use_global_queue=False): self.broker = broker self.port = port self.topic = topic self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message # 绑定消息回调 self.use_global_queue = use_global_queue # 是否使用全局队列 self.local_mess=None self.local_message_queue = queue.Queue() if not use_global_queue else None # 本地队列 self.client.connect(broker, port) self.client.loop_start() # 启动网络线程 def on_connect(self, client, userdata, flags, rc): """连接成功时自动订阅主题""" if rc == 0: print(f"[MQTT] 成功连接到代理 {self.broker}:{self.port}") self.client.subscribe(self.topic) else: print(f"[MQTT] 连接失败,错误代码: {rc}") def on_message(self, client, userdata, msg): """收到消息时的回调""" try: payload = msg.payload.decode("utf-8") # message_data = { # "topic": msg.topic, # "payload": payload, # "qos": msg.qos, # "retain": msg.retain, # "ts": int(time.time() * 1000) # 毫秒级时间戳 # } # data = json.loads(payload) # 解析为字典 message_data=json.loads(payload) # 解析为字典 # if self.use_global_queue: # global_message_queue.put(message_data) # 存入全局队列 # else: # self.local_message_queue.put(message_data) # 存入本地队列 # 增加格式验证,非法格式不做录入 if (message_data is not None and message_data["data"]["attitude_head"] is not None): # DRC-高频 osd 信息上报 格式验证 self.local_mess=message_data # print(f"[MQTT 接收] 主题: {msg.topic}, 消息: {payload}") except Exception as e: print(f"[MQTT 错误] 消息处理失败: {e}") def publish_message(self, message): """发布消息""" self.client.publish(self.topic, message) print(f"[MQTT 发送] 主题: {self.topic}, 消息: {message}") def get_messages(self, timeout=1): """ 获取消息(支持超时) - 如果使用全局队列,从 global_message_queue 获取 - 否则从本地队列获取 """ messages = None # target_queue = global_message_queue if self.use_global_queue else self.local_message_queue start_time = time.time() while time.time() - start_time < timeout: try: # msg = target_queue.get(block=True, timeout=0.1) if self.local_mess: messages = self.local_mess # messages.append(msg) self.local_mess=None except queue.Empty: break return messages def close(self): """关闭连接""" self.client.loop_stop() self.client.disconnect() print("[MQTT] 已断开连接") # 生产者线程:将消息放入队列 def producer_thread(): # message_queue.put(mess) for i in range(100): message = f"Message {i}" message_queue.put(message) print(f"生产者线程:已将消息放入队列: {message}") # time.sleep(1) # 模拟生产间隔 # 消费者线程:从队列中取出消息并发布 def consumer_thread(mqtt_client): print("consumer_thread") # while True: # try: # message = message_queue.get(timeout=1) # 设置超时以避免无限阻塞 # mqtt_client.publish_message(message) # message_queue.task_done() # 标记任务完成 # except queue.Empty: # print("消费者线程:队列为空,等待中...") # time.sleep(3) # 避免频繁检查队列 if __name__ == "__main__": # 初始化 MQTT 客户端 mqtt_client = MQTTClient(broker, port, topic) # # 启动生产者线程 # producer = threading.Thread(target=producer_thread) # producer.start() # 启动消费者线程 consumer = threading.Thread(target=consumer_thread, args=(mqtt_client,)) consumer.start() # # 等待生产者线程完成 # producer.join() # 等待队列中的所有消息被处理 # message_queue.join() while True: mess=mqtt_client.get_new_message() print(mess) # 关闭 MQTT 客户端 mqtt_client.close() time.sleep(5) print("程序结束") # # import json # import paho.mqtt.client as mqtt # import threading # import queue # import time # import logging # # # 配置日志 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # logger = logging.getLogger(__name__) # # # class MQTTClient: # def __init__(self, broker, port, topic, use_global_queue=False): # self.broker = broker # self.port = port # self.topic = topic # self.client = mqtt.Client() # self.client.on_connect = self.on_connect # self.client.on_message = self.on_message # self.use_global_queue = use_global_queue # self.local_message = None # self.local_message_queue = queue.Queue() if not use_global_queue else None # self._is_running = False # self._thread = None # self._lock = threading.Lock() # # def on_connect(self, client, userdata, flags, rc): # """连接成功时自动订阅主题""" # if rc == 0: # logger.info(f"成功连接到代理 {self.broker}:{self.port}") # client.subscribe(self.topic) # else: # logger.error(f"连接失败,错误代码: {rc}") # # def on_message(self, client, userdata, msg): # """收到消息时的回调""" # try: # payload = msg.payload.decode("utf-8") # message_data = json.loads(payload) # # # 增加格式验证 # if (message_data and # isinstance(message_data.get("data"), dict) and # "attitude_head" in message_data["data"]): # with self._lock: # self.local_message = message_data # if not self.use_global_queue and self.local_message_queue: # self.local_message_queue.put(message_data) # except Exception as e: # logger.error(f"消息处理失败: {e}") # # def publish_message(self, topic, message): # """发布消息到指定主题""" # self.client.publish(topic, message) # logger.info(f"发送消息 - 主题: {topic}, 内容: {message}") # # def get_messages(self, timeout=1): # """获取消息(支持超时)""" # messages = None # start_time = time.time() # # while time.time() - start_time < timeout: # with self._lock: # if self.local_message: # messages = self.local_message # self.local_message = None # return messages # time.sleep(0.1) # 避免CPU空转 # return None # # def loop_start(self): # """启动独立的后台线程来处理MQTT网络通信""" # if not self._is_running: # self._is_running = True # self._thread = threading.Thread(target=self._run_loop, daemon=True) # self._thread.start() # logger.info("MQTT后台线程已启动") # # def loop_stop(self): # """停止后台线程""" # if self._is_running: # self._is_running = False # if self._thread: # self._thread.join() # self.client.loop_stop() # logger.info("MQTT后台线程已停止") # # def _run_loop(self): # """后台线程运行的主循环""" # try: # self.client.connect(self.broker, self.port) # self.client.loop_start() # 启动paho的内部线程 # # while self._is_running: # time.sleep(0.1) # 保持线程运行 # except Exception as e: # logger.error(f"MQTT线程异常: {e}") # finally: # self.client.loop_stop() # # def close(self): # """关闭连接""" # self.loop_stop() # self.client.disconnect() # logger.info("MQTT连接已关闭") # # def __del__(self): # """析构时自动清理""" # self.close() # # # # 测试代码 # if __name__ == "__main__": # try: # # 初始化MQTT客户端 # broker = "112.44.103.230" # port = 1883 # topic = "test/topic" # # mqtt_client = MQTTClient(broker, port, topic) # mqtt_client.loop_start() # 启动后台线程 # # # 模拟持续接收消息 # while True: # message = mqtt_client.get_messages(timeout=5) # if message: # print(f"收到消息: {message}") # else: # print("等待消息...") # # except KeyboardInterrupt: # print("\n正在关闭...") # finally: # mqtt_client.close() # time.sleep(1) # print("程序结束") #