routes.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import paho.mqtt.client as mqtt
  2. from typing import Dict, List, Callable
  3. from pydantic import BaseModel
  4. import json
  5. import logging
  6. logger = logging.getLogger(__name__)
  7. class Device:
  8. def __init__(self, device_id: str, num_slots: int):
  9. self.device_id = device_id
  10. self.num_slots = num_slots
  11. class MeasurementResult(BaseModel):
  12. device_id: str
  13. cell_id: str
  14. slot: int
  15. capacity: float
  16. status: str
  17. class MQTTHandler:
  18. def __init__(self, broker="localhost", port=1883, username=None, password=None):
  19. self.client = mqtt.Client()
  20. self.devices: List[Device] = []
  21. self.measurement_callbacks: Dict[str, Dict[int, Callable]] = {}
  22. self.client.username_pw_set(username, password)
  23. self.client.on_connect = self.on_connect
  24. self.client.on_message = self.on_message
  25. self.client.connect(broker, port, 60)
  26. self.client.loop_start()
  27. def register_device(self, device: Device):
  28. """Register a new device to handle"""
  29. self.devices.append(device)
  30. self.measurement_callbacks[device.device_id] = {}
  31. # Subscribe to device specific topics
  32. self._subscribe_device_topics(device.device_id)
  33. def _subscribe_device_topics(self, device_id: str):
  34. """Subscribe to all topics for a specific device"""
  35. topics = [
  36. f"measurement_done/{device_id}",
  37. f"soa/{device_id}"
  38. ]
  39. for topic in topics:
  40. self.client.subscribe(topic)
  41. logger.info(f"Subscribed to {topic}")
  42. def on_connect(self, client, userdata, flags, rc):
  43. if rc == 0:
  44. logger.info("Connected to MQTT Broker!")
  45. else:
  46. raise ConnectionError(f"Failed to connect, return code {rc}")
  47. # Resubscribe to all device topics on reconnect
  48. for device in self.devices:
  49. self._subscribe_device_topics(device.device_id)
  50. def on_message(self, client, userdata, msg):
  51. try:
  52. payload = json.loads(msg.payload.decode())
  53. topic = msg.topic
  54. device_id = topic.split('/')[1] # Extract device_id from topic
  55. if topic.startswith("measurement_done/"):
  56. result = MeasurementResult(**payload)
  57. logger.info(f"Measurement complete for device {device_id}, slot {result.slot}")
  58. if device_id in self.measurement_callbacks and result.slot in self.measurement_callbacks[device_id]:
  59. self.measurement_callbacks[device_id][result.slot](result)
  60. elif topic.startswith("soa/"):
  61. logger.info(f"SOA update for device {device_id}: {payload}")
  62. # Handle SOA update here
  63. except Exception as e:
  64. logger.error(f"Error processing message: {e}")
  65. def start_measurement(self, device_id: str, slot: int, cell_id: str, callback: Callable = None):
  66. """Publish measurement start command for specific device"""
  67. if device_id not in [d.device_id for d in self.devices]:
  68. raise ValueError(f"Device {device_id} not registered")
  69. payload = {"slot": slot, "cell_id": cell_id}
  70. self.client.publish(f"cells_inserted/{device_id}", json.dumps(payload))
  71. if callback:
  72. if device_id not in self.measurement_callbacks:
  73. self.measurement_callbacks[device_id] = {}
  74. self.measurement_callbacks[device_id][slot] = callback
  75. def cleanup(self):
  76. """Cleanup MQTT connection"""
  77. self.client.loop_stop()
  78. self.client.disconnect()