95 lines
2.6 KiB
Python
95 lines
2.6 KiB
Python
import paho.mqtt.client as mqtt
|
||
import json
|
||
from datetime import datetime
|
||
|
||
# MQTT 配置
|
||
MQTT_BROKER = "localhost" # 本地MQTT代理地址
|
||
MQTT_PORT = 1883 # 默认MQTT端口
|
||
MQTT_TOPIC = "test/topic" # 订阅的主题
|
||
MQTT_CLIENT_ID = "message_processor" # 客户端ID
|
||
|
||
|
||
def on_connect(client, userdata, flags, rc):
|
||
"""连接回调函数"""
|
||
print(f"Connected to MQTT broker with result code {rc}")
|
||
# 订阅主题
|
||
client.subscribe(MQTT_TOPIC)
|
||
print(f"Subscribed to topic: {MQTT_TOPIC}")
|
||
|
||
|
||
def on_message(client, userdata, msg):
|
||
"""消息接收回调函数"""
|
||
try:
|
||
# 1. 接收原始消息
|
||
raw_message = msg.payload.decode('utf-8')
|
||
print(f"\n[{datetime.now()}] Received raw message:")
|
||
print(raw_message)
|
||
|
||
# 2. 二次处理 - 这里示例处理JSON格式消息
|
||
processed_data = process_message(raw_message)
|
||
|
||
# 3. 输出处理结果
|
||
print("\nProcessed result:")
|
||
print(json.dumps(processed_data, indent=2, ensure_ascii=False))
|
||
|
||
except Exception as e:
|
||
print(f"Error processing message: {e}")
|
||
|
||
|
||
def process_message(raw_message):
|
||
"""
|
||
消息二次处理函数
|
||
这里可以根据实际需求实现各种处理逻辑
|
||
"""
|
||
try:
|
||
# 尝试解析为JSON
|
||
data = json.loads(raw_message)
|
||
|
||
# 示例处理:添加处理时间戳
|
||
processed = {
|
||
"original_data": data,
|
||
"processing_timestamp": datetime.now().isoformat(),
|
||
"status": "processed"
|
||
}
|
||
|
||
# 可以在这里添加更多处理逻辑
|
||
# 例如:数据转换、计算、过滤等
|
||
|
||
return processed
|
||
|
||
except json.JSONDecodeError:
|
||
# 如果不是JSON,按文本处理
|
||
return {
|
||
"text_content": raw_message,
|
||
"word_count": len(raw_message.split()),
|
||
"char_count": len(raw_message)
|
||
}
|
||
|
||
|
||
def main():
|
||
# 创建MQTT客户端
|
||
client = mqtt.Client(client_id=MQTT_CLIENT_ID, clean_session=True)
|
||
|
||
# 设置回调函数
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
|
||
try:
|
||
# 连接到MQTT代理
|
||
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
|
||
|
||
print(f"Starting MQTT client to listen on {MQTT_BROKER}:{MQTT_PORT}, topic: {MQTT_TOPIC}")
|
||
print("Waiting for messages...")
|
||
|
||
# 启动网络循环
|
||
client.loop_forever()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\nDisconnecting from MQTT broker...")
|
||
client.disconnect()
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |