| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import logging
- import paho.mqtt.client as mqtt
- from pydantic import BaseModel
- import json
- from typing import Dict, Callable, Optional
- from src.models.device import DeviceStatus
- logger = logging.getLogger(__name__)
- class InsertedCell(BaseModel):
- device_id: int
- slot_id: int
- cell_id: int
- class MQTTService:
- def __init__(self, config: dict):
- self.config = config
- broker_address = config['mqtt']['broker_address']
- port = config['mqtt']['port']
- keepalive = config['mqtt']['keepalive']
- username = config['mqtt']['username']
- password = config['mqtt']['password']
- debug = config['mqtt']['debug']
-
- self.client = mqtt.Client()
- self.client.username_pw_set(username, password)
- self.client.on_connect = self.on_connect
- self.client.on_message = self.on_message
- self.devices: dict[int, int] = {}
- self.insertion_callbacks: Dict[str, Dict[int, Callable]] = {}
- if debug:
- logger.info("No MQTT in debug mode")
- return
-
- if broker_address == "debug" or debug:
- self.client.connect("test.mosquitto.org", 1883)
- return
-
- try:
- self.client.connect(broker_address, port, keepalive)
- self.client.loop_start()
- except ConnectionRefusedError as e:
- raise ConnectionError(f"Failed to connect to MQTT broker at {broker_address}:{port}") from e
- def register_device(self, device_id, num_slots, callback: Optional[Callable] = None):
- """Register a new device to handle"""
- self.devices[device_id] = num_slots
- self.insertion_callbacks[device_id] = {}
- if callback:
- for slot in range(num_slots):
- self.insertion_callbacks[device_id][slot] = callback
- def _subscribe_device_topics(self, device_id: int):
- """Subscribe to all topics for a specific device"""
- topics = [
- f"cells_inserted/device_{device_id}",
- ]
- for topic in topics:
- self.client.subscribe(topic)
- logger.info(f"Subscribed to {topic}")
- def on_connect(self, client, userdata, flags, rc):
- if rc == 0:
- logger.info("Connected to MQTT Broker!")
- else:
- raise ConnectionError(f"Failed to connect, return code {rc}")
- # Resubscribe to all device topics on reconnect
- for device_id in self.devices.keys():
- self._subscribe_device_topics(device_id)
- def on_message(self, client, userdata, msg):
- try:
- payload = json.loads(msg.payload.decode())
- topic = msg.topic
- device_id = int(topic.split('/')[1].split('_')[1]) # Extract device_id number from topic
- inserted_cell = InsertedCell(device_id=device_id, **payload)
- logger.info(f"Cell inserted: {inserted_cell}")
- if device_id in self.insertion_callbacks and inserted_cell.slot_id in self.insertion_callbacks[device_id]:
- self.insertion_callbacks[device_id][inserted_cell.slot_id](inserted_cell)
- else:
- logger.warning(f"No callback for insertion {inserted_cell}")
-
- except Exception as e:
- logger.error(f"Error processing MQTT message: {e}")
- def cell_finished(self, device_id: int, slot_id: int, cell_id: int, capacity: float, status: DeviceStatus):
- """Publish a message for a cell finishing measurement"""
- if device_id not in self.devices:
- raise ValueError(f"Device {device_id} not registered")
-
- topic = f"measurement_done/{device_id}"
- payload = {
- "device_id": device_id,
- "slot_id": slot_id,
- "cell_id": cell_id,
- "capacity": round(capacity, 4),
- "status": status.name
- }
- self.client.publish(topic, json.dumps(payload))
- logger.info(f"MQTT msg published for {topic}: {payload}")
- def cleanup(self):
- """Cleanup MQTT connection"""
- self.client.loop_stop()
- self.client.disconnect()
|