import paho.mqtt.client as mqtt from typing import Dict, List, Callable from pydantic import BaseModel import json import logging logger = logging.getLogger(__name__) class Device: def __init__(self, device_id: str, num_slots: int): self.device_id = device_id self.num_slots = num_slots class MeasurementResult(BaseModel): device_id: str cell_id: str slot: int capacity: float status: str class MQTTHandler: def __init__(self, broker="localhost", port=1883, username=None, password=None): self.client = mqtt.Client() self.devices: List[Device] = [] self.measurement_callbacks: Dict[str, Dict[int, Callable]] = {} self.client.username_pw_set(username, password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.connect(broker, port, 60) self.client.loop_start() def register_device(self, device: Device): """Register a new device to handle""" self.devices.append(device) self.measurement_callbacks[device.device_id] = {} # Subscribe to device specific topics self._subscribe_device_topics(device.device_id) def _subscribe_device_topics(self, device_id: str): """Subscribe to all topics for a specific device""" topics = [ f"measurement_done/{device_id}", f"soa/{device_id}" ] for topic in topics: self.client.subscribe(topic) logger.info(f"Subscribed to {topic}") def on_connect(self, client, userdata, flags, rc): if rc == 0: logger.info("Connected to MQTT Broker!") else: raise ConnectionError(f"Failed to connect, return code {rc}") # Resubscribe to all device topics on reconnect for device in self.devices: self._subscribe_device_topics(device.device_id) def on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode()) topic = msg.topic device_id = topic.split('/')[1] # Extract device_id from topic if topic.startswith("measurement_done/"): result = MeasurementResult(**payload) logger.info(f"Measurement complete for device {device_id}, slot {result.slot}") if device_id in self.measurement_callbacks and result.slot in self.measurement_callbacks[device_id]: self.measurement_callbacks[device_id][result.slot](result) elif topic.startswith("soa/"): logger.info(f"SOA update for device {device_id}: {payload}") # Handle SOA update here except Exception as e: logger.error(f"Error processing message: {e}") def start_measurement(self, device_id: str, slot: int, cell_id: str, callback: Callable = None): """Publish measurement start command for specific device""" if device_id not in [d.device_id for d in self.devices]: raise ValueError(f"Device {device_id} not registered") payload = {"slot": slot, "cell_id": cell_id} self.client.publish(f"cells_inserted/{device_id}", json.dumps(payload)) if callback: if device_id not in self.measurement_callbacks: self.measurement_callbacks[device_id] = {} self.measurement_callbacks[device_id][slot] = callback def cleanup(self): """Cleanup MQTT connection""" self.client.loop_stop() self.client.disconnect()