398 lines
15 KiB
Python
398 lines
15 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Set, Dict
|
|
|
|
from touying.getmq_sendresult import CalTouYing
|
|
import paho.mqtt.client as mqtt
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
#
|
|
## MQTT客户端类 - 使用异步处理
|
|
#class AsyncMQTTClient:
|
|
# def __init__(self):
|
|
# self.client = None
|
|
# self.broker = None
|
|
# self.port = None
|
|
# self.connected = False
|
|
# self.websocket_connections: Set = set()
|
|
# self.current_topic = None
|
|
# self.calculator = CalTouYing() # 初始化计算器
|
|
# self.loop = None # 存储事件循环引用
|
|
# self.loop_lock = asyncio.Lock() # 添加锁确保线程安全
|
|
#
|
|
# async def connect(self, broker: str, port: int = 1883, client_id: str = "sanic-mqtt-client"):
|
|
# """异步连接到MQTT服务器"""
|
|
# if self.connected and self.broker == broker and self.port == port:
|
|
# logger.info("Already connected to the same MQTT broker")
|
|
# return True
|
|
#
|
|
# # 获取当前事件循环并保存引用
|
|
# self.loop = asyncio.get_event_loop()
|
|
#
|
|
# self.broker = broker
|
|
# self.port = port
|
|
#
|
|
# # 创建MQTT客户端
|
|
# self.client = mqtt.Client(client_id=client_id)
|
|
# self.client.on_disconnect = self._on_disconnect
|
|
# self.client.on_message = self._on_message
|
|
#
|
|
# # 使用事件循环处理MQTT网络操作
|
|
# future = self.loop.create_future()
|
|
#
|
|
# def on_connect_wrapper(client, userdata, flags, rc):
|
|
# if rc == 0:
|
|
# logger.info(f"Connected to MQTT broker: {broker}:{port}")
|
|
# self.connected = True
|
|
# self.loop.call_soon_threadsafe(future.set_result, True)
|
|
# else:
|
|
# logger.error(f"MQTT connection failed with code: {rc}")
|
|
# self.loop.call_soon_threadsafe(future.set_exception, ConnectionError(f"Connection failed: {rc}"))
|
|
#
|
|
# self.client.on_connect = on_connect_wrapper
|
|
#
|
|
# try:
|
|
# self.client.connect(broker, port)
|
|
# self.client.loop_start()
|
|
# await future
|
|
# return True
|
|
# except Exception as e:
|
|
# logger.error(f"Failed to connect to MQTT broker: {e}")
|
|
# return False
|
|
#
|
|
# async def disconnect(self):
|
|
# """异步断开MQTT连接"""
|
|
# if self.connected and self.client:
|
|
# future = self.loop.create_future()
|
|
#
|
|
# def on_disconnect_wrapper(client, userdata, rc):
|
|
# self.connected = False
|
|
# logger.info("Disconnected from MQTT broker")
|
|
# self.loop.call_soon_threadsafe(future.set_result, None)
|
|
#
|
|
# self.client.on_disconnect = on_disconnect_wrapper
|
|
# self.client.disconnect()
|
|
# await future
|
|
#
|
|
# async def subscribe(self, topic: str):
|
|
# """异步订阅MQTT主题"""
|
|
# if not self.connected:
|
|
# logger.warning("Not connected to MQTT broker, cannot subscribe")
|
|
# return False
|
|
#
|
|
# if self.current_topic == topic:
|
|
# logger.info(f"Already subscribed to topic: {topic}")
|
|
# return True
|
|
#
|
|
# self.current_topic = topic
|
|
#
|
|
# # 使用异步方式订阅
|
|
# future = self.loop.create_future()
|
|
#
|
|
# def on_subscribe_wrapper(client, userdata, mid, granted_qos):
|
|
# logger.info(f"Subscribed to MQTT topic: {topic}")
|
|
# self.loop.call_soon_threadsafe(future.set_result, True)
|
|
#
|
|
# self.client.on_subscribe = on_subscribe_wrapper
|
|
# self.client.subscribe(topic)
|
|
#
|
|
# try:
|
|
# await future
|
|
# return True
|
|
# except Exception as e:
|
|
# logger.error(f"Failed to subscribe to topic: {e}")
|
|
# return False
|
|
#
|
|
# def add_websocket(self, ws):
|
|
# """添加WebSocket连接"""
|
|
# self.websocket_connections.add(ws)
|
|
# logger.info(f"WebSocket connection added. Total connections: {len(self.websocket_connections)}")
|
|
#
|
|
# def remove_websocket(self, ws):
|
|
# """移除WebSocket连接"""
|
|
# if ws in self.websocket_connections:
|
|
# self.websocket_connections.remove(ws)
|
|
# logger.info(f"WebSocket connection removed. Total connections: {len(self.websocket_connections)}")
|
|
#
|
|
# async def broadcast(self, message: str):
|
|
# """广播消息到所有WebSocket连接"""
|
|
# if not self.websocket_connections:
|
|
# return
|
|
#
|
|
# # 创建连接副本,防止迭代过程中集合被修改
|
|
# connections = list(self.websocket_connections)
|
|
# for ws in connections:
|
|
# try:
|
|
# await ws.send(message)
|
|
# except Exception as e:
|
|
# logger.error(f"Error sending message to WebSocket: {e}")
|
|
# self.remove_websocket(ws)
|
|
#
|
|
# def _on_disconnect(self, client, userdata, rc):
|
|
# """MQTT断开连接回调"""
|
|
# self.connected = False
|
|
# logger.warning(f"MQTT disconnected with code: {rc}")
|
|
#
|
|
# def _on_message(self, client, userdata, msg):
|
|
# """MQTT消息接收回调"""
|
|
# try:
|
|
# payload = msg.payload.decode('utf-8')
|
|
# logger.debug(f"Received MQTT message on topic '{msg.topic}': {payload}")
|
|
#
|
|
# # 解析JSON消息
|
|
# try:
|
|
# data = json.loads(payload)
|
|
# except json.JSONDecodeError:
|
|
# data = {"raw_message": payload}
|
|
#
|
|
# # 使用CalTouYing类处理消息
|
|
# processed_data = self.calculator.process(data)
|
|
#
|
|
# if processed_data is not None and len(processed_data["result"])>0:
|
|
#
|
|
# # 添加元数据
|
|
# processed_data["metadata"] = {
|
|
# "topic": msg.topic,
|
|
# "timestamp": int(time.time() * 1000)
|
|
# }
|
|
#
|
|
# # 线程安全地将广播任务提交到事件循环
|
|
# async def broadcast_task():
|
|
# await self.broadcast(json.dumps(processed_data))
|
|
#
|
|
# # 修复:确保在事件循环运行时提交任务
|
|
# if self.loop and self.loop.is_running():
|
|
# self.loop.call_soon_threadsafe(
|
|
# lambda: asyncio.create_task(broadcast_task())
|
|
# )
|
|
# else:
|
|
# logger.error("Event loop is not running, cannot process message")
|
|
#
|
|
# except Exception as e:
|
|
# logger.error(f"Error processing MQTT message: {e}")
|
|
|
|
|
|
|
|
from sanic import Websocket
|
|
|
|
# 存储所有活动连接及其对应的上下文信息
|
|
active_connections: Dict[str, 'ConnectionContext'] = {}
|
|
|
|
|
|
class ConnectionContext:
|
|
"""每个WebSocket连接的上下文信息"""
|
|
def __init__(self, connection_id: str, ws: Websocket):
|
|
self.connection_id = connection_id
|
|
self.ws = ws
|
|
self.mqtt_client = AsyncMQTTClient(connection_id)
|
|
self.calculator = None
|
|
self.file_path = None
|
|
|
|
|
|
class AsyncMQTTClient:
|
|
"""MQTT客户端类 - 为每个WebSocket连接创建独立实例"""
|
|
|
|
def __init__(self, connection_id: str):
|
|
self.connection_id = connection_id
|
|
self.client = None
|
|
self.broker = None
|
|
self.port = None
|
|
self.connected = False
|
|
self.current_topic = None
|
|
self.loop = None # 存储事件循环引用
|
|
|
|
async def connect(self, broker: str, port: int = 1883, client_id: str = None):
|
|
"""异步连接到MQTT服务器"""
|
|
if self.connected and self.broker == broker and self.port == port:
|
|
logger.info(f"Connection {self.connection_id} already connected to MQTT broker {broker}:{port}")
|
|
return True
|
|
|
|
# 获取当前事件循环并保存引用
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
self.broker = broker
|
|
self.port = port
|
|
|
|
# 生成唯一客户端ID
|
|
client_id = client_id or f"sanic-mqtt-client-{self.connection_id}-{uuid.uuid4().hex[:8]}"
|
|
|
|
# 创建MQTT客户端
|
|
self.client = mqtt.Client(client_id=client_id)
|
|
self.client.on_disconnect = self._on_disconnect
|
|
self.client.on_message = self._on_message
|
|
|
|
# 使用事件循环处理MQTT网络操作
|
|
future = self.loop.create_future()
|
|
|
|
def on_connect_wrapper(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logger.info(f"Connection {self.connection_id} connected to MQTT broker: {broker}:{port}")
|
|
self.connected = True
|
|
self.loop.call_soon_threadsafe(future.set_result, True)
|
|
else:
|
|
logger.error(f"Connection {self.connection_id} MQTT connection failed with code: {rc}")
|
|
self.loop.call_soon_threadsafe(future.set_exception, ConnectionError(f"Connection failed: {rc}"))
|
|
|
|
self.client.on_connect = on_connect_wrapper
|
|
|
|
try:
|
|
self.client.connect(broker, port)
|
|
self.client.loop_start()
|
|
await future
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Connection {self.connection_id} failed to connect to MQTT broker: {e}")
|
|
return False
|
|
|
|
async def disconnect(self):
|
|
"""异步断开MQTT连接"""
|
|
if self.connected and self.client:
|
|
future = self.loop.create_future()
|
|
|
|
def on_disconnect_wrapper(client, userdata, rc):
|
|
self.connected = False
|
|
logger.info(f"Connection {self.connection_id} disconnected from MQTT broker")
|
|
if self.loop and self.loop.is_running():
|
|
self.loop.call_soon_threadsafe(future.set_result, None)
|
|
|
|
self.client.on_disconnect = on_disconnect_wrapper
|
|
self.client.disconnect()
|
|
|
|
try:
|
|
# 等待断开连接完成,设置超时
|
|
await asyncio.wait_for(future, timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Connection {self.connection_id} MQTT disconnect timed out")
|
|
finally:
|
|
self.client.loop_stop()
|
|
|
|
async def subscribe(self, topic: str):
|
|
"""异步订阅MQTT主题"""
|
|
if not self.connected:
|
|
logger.warning(f"Connection {self.connection_id} not connected to MQTT broker, cannot subscribe")
|
|
return False
|
|
|
|
if self.current_topic == topic:
|
|
logger.info(f"Connection {self.connection_id} already subscribed to topic: {topic}")
|
|
return True
|
|
|
|
self.current_topic = topic
|
|
|
|
# 使用异步方式订阅
|
|
future = self.loop.create_future()
|
|
|
|
def on_subscribe_wrapper(client, userdata, mid, granted_qos):
|
|
logger.info(f"Connection {self.connection_id} subscribed to MQTT topic: {topic}")
|
|
self.loop.call_soon_threadsafe(future.set_result, True)
|
|
|
|
self.client.on_subscribe = on_subscribe_wrapper
|
|
self.client.subscribe(topic)
|
|
|
|
try:
|
|
await future
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Connection {self.connection_id} failed to subscribe to topic: {e}")
|
|
return False
|
|
|
|
async def publish(self, topic, message):
|
|
"""异步发布消息到MQTT主题"""
|
|
if not self.connected:
|
|
logger.warning(f"Connection {self.connection_id} not connected to MQTT broker, cannot publish")
|
|
return False
|
|
|
|
# 使用线程安全的方式调用同步publish
|
|
future = self.loop.create_future()
|
|
|
|
def publish_wrapper():
|
|
try:
|
|
self.client.publish(topic, message)
|
|
if self.loop and self.loop.is_running():
|
|
self.loop.call_soon_threadsafe(lambda: future.set_result(True))
|
|
except Exception as e:
|
|
if self.loop and self.loop.is_running():
|
|
self.loop.call_soon_threadsafe(lambda: future.set_exception(e))
|
|
|
|
if self.loop and self.loop.is_running():
|
|
self.loop.call_soon_threadsafe(publish_wrapper)
|
|
await future
|
|
return True
|
|
return False
|
|
|
|
def _on_disconnect(self, client, userdata, rc):
|
|
"""MQTT断开连接回调"""
|
|
self.connected = False
|
|
logger.warning(f"Connection {self.connection_id} MQTT disconnected with code: {rc}")
|
|
|
|
def _on_message(self, client, userdata, msg):
|
|
"""MQTT消息接收回调 - 处理消息并发送到对应的WebSocket连接"""
|
|
try:
|
|
payload = msg.payload.decode('utf-8')
|
|
logger.debug(f"Connection {self.connection_id} received MQTT message on topic '{msg.topic}': {payload}")
|
|
|
|
# 解析JSON消息
|
|
try:
|
|
data = json.loads(payload)
|
|
except json.JSONDecodeError:
|
|
data = {"raw_message": payload}
|
|
|
|
# 查找对应的连接上下文
|
|
context = None
|
|
for conn_context in active_connections.values():
|
|
if conn_context.mqtt_client == self:
|
|
context = conn_context
|
|
break
|
|
|
|
if not context:
|
|
logger.error(f"No connection context found for MQTT client {self.connection_id}")
|
|
return
|
|
|
|
# 使用CalTouYing类处理消息
|
|
if context.calculator:
|
|
processed_data = context.calculator.process(data)
|
|
else:
|
|
processed_data = {"result": [], "warning": "Calculator not initialized"}
|
|
|
|
if processed_data and (not isinstance(processed_data, dict) or len(processed_data.get("result", [])) > 0):
|
|
# 添加元数据
|
|
if isinstance(processed_data, dict):
|
|
processed_data["metadata"] = {
|
|
"topic": msg.topic,
|
|
"timestamp": int(time.time() * 1000),
|
|
"connection_id": self.connection_id
|
|
}
|
|
else:
|
|
processed_data = {
|
|
"result": processed_data,
|
|
"metadata": {
|
|
"topic": msg.topic,
|
|
"timestamp": int(time.time() * 1000),
|
|
"connection_id": self.connection_id
|
|
}
|
|
}
|
|
|
|
# 线程安全地将消息发送到对应的WebSocket
|
|
async def send_to_websocket():
|
|
try:
|
|
await context.ws.send(json.dumps(processed_data))
|
|
except Exception as e:
|
|
logger.error(f"Connection {self.connection_id} error sending to WebSocket: {e}")
|
|
|
|
if self.loop and self.loop.is_running():
|
|
self.loop.call_soon_threadsafe(
|
|
lambda: asyncio.create_task(send_to_websocket())
|
|
)
|
|
else:
|
|
logger.error(f"Connection {self.connection_id} event loop not running, cannot send message")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Connection {self.connection_id} error processing MQTT message: {e}", exc_info=True)
|
|
|
|
|