95 lines
2.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import paho.mqtt.client as mqtt
import json
from datetime import datetime
# MQTT 配置
MQTT_BROKER = "localhost" # 本地MQTT代理地址
MQTT_PORT = 1883 # 默认MQTT端口
MQTT_TOPIC = "test/topic" # 订阅的主题
MQTT_CLIENT_ID = "message_processor" # 客户端ID
def on_connect(client, userdata, flags, rc):
"""连接回调函数"""
print(f"Connected to MQTT broker with result code {rc}")
# 订阅主题
client.subscribe(MQTT_TOPIC)
print(f"Subscribed to topic: {MQTT_TOPIC}")
def on_message(client, userdata, msg):
"""消息接收回调函数"""
try:
# 1. 接收原始消息
raw_message = msg.payload.decode('utf-8')
print(f"\n[{datetime.now()}] Received raw message:")
print(raw_message)
# 2. 二次处理 - 这里示例处理JSON格式消息
processed_data = process_message(raw_message)
# 3. 输出处理结果
print("\nProcessed result:")
print(json.dumps(processed_data, indent=2, ensure_ascii=False))
except Exception as e:
print(f"Error processing message: {e}")
def process_message(raw_message):
"""
消息二次处理函数
这里可以根据实际需求实现各种处理逻辑
"""
try:
# 尝试解析为JSON
data = json.loads(raw_message)
# 示例处理:添加处理时间戳
processed = {
"original_data": data,
"processing_timestamp": datetime.now().isoformat(),
"status": "processed"
}
# 可以在这里添加更多处理逻辑
# 例如:数据转换、计算、过滤等
return processed
except json.JSONDecodeError:
# 如果不是JSON按文本处理
return {
"text_content": raw_message,
"word_count": len(raw_message.split()),
"char_count": len(raw_message)
}
def main():
# 创建MQTT客户端
client = mqtt.Client(client_id=MQTT_CLIENT_ID, clean_session=True)
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
try:
# 连接到MQTT代理
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
print(f"Starting MQTT client to listen on {MQTT_BROKER}:{MQTT_PORT}, topic: {MQTT_TOPIC}")
print("Waiting for messages...")
# 启动网络循环
client.loop_forever()
except KeyboardInterrupt:
print("\nDisconnecting from MQTT broker...")
client.disconnect()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()