import logging import paho.mqtt.client as mqtt from pydantic import BaseModel import json from typing import Dict, Callable, Optional from src.models.device import DeviceStatus logger = logging.getLogger(__name__) class InsertedCell(BaseModel): device_id: int slot_id: int cell_id: int class MQTTService: def __init__(self, config: dict): self.config = config broker_address = config['mqtt']['broker_address'] port = config['mqtt']['port'] keepalive = config['mqtt']['keepalive'] username = config['mqtt']['username'] password = config['mqtt']['password'] debug = config['mqtt']['debug'] self.client = mqtt.Client() self.client.username_pw_set(username, password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.devices: dict[int, int] = {} self.insertion_callbacks: Dict[str, Dict[int, Callable]] = {} if debug: logger.info("No MQTT in debug mode") return if broker_address == "debug" or debug: self.client.connect("test.mosquitto.org", 1883) return try: self.client.connect(broker_address, port, keepalive) self.client.loop_start() except ConnectionRefusedError as e: raise ConnectionError(f"Failed to connect to MQTT broker at {broker_address}:{port}") from e def register_device(self, device_id, num_slots, callback: Optional[Callable] = None): """Register a new device to handle""" self.devices[device_id] = num_slots self.insertion_callbacks[device_id] = {} if callback: for slot in range(num_slots): self.insertion_callbacks[device_id][slot] = callback def _subscribe_device_topics(self, device_id: int): """Subscribe to all topics for a specific device""" topics = [ f"cells_inserted/device_{device_id}", ] for topic in topics: self.client.subscribe(topic) logger.info(f"Subscribed to {topic}") def on_connect(self, client, userdata, flags, rc): if rc == 0: logger.info("Connected to MQTT Broker!") else: raise ConnectionError(f"Failed to connect, return code {rc}") # Resubscribe to all device topics on reconnect for device_id in self.devices.keys(): self._subscribe_device_topics(device_id) def on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode()) topic = msg.topic device_id = int(topic.split('/')[1].split('_')[1]) # Extract device_id number from topic inserted_cell = InsertedCell(device_id=device_id, **payload) logger.info(f"Cell inserted: {inserted_cell}") if device_id in self.insertion_callbacks and inserted_cell.slot_id in self.insertion_callbacks[device_id]: self.insertion_callbacks[device_id][inserted_cell.slot_id](inserted_cell) else: logger.warning(f"No callback for insertion {inserted_cell}") except Exception as e: logger.error(f"Error processing MQTT message: {e}") def cell_finished(self, device_id: int, slot_id: int, cell_id: int, capacity: float, status: DeviceStatus): """Publish a message for a cell finishing measurement""" if device_id not in self.devices: raise ValueError(f"Device {device_id} not registered") topic = f"measurement_done/{device_id}" payload = { "device_id": device_id, "slot_id": slot_id, "cell_id": cell_id, "capacity": round(capacity, 4), "status": status.name } self.client.publish(topic, json.dumps(payload)) logger.info(f"MQTT msg published for {topic}: {payload}") def cleanup(self): """Cleanup MQTT connection""" self.client.loop_stop() self.client.disconnect()