Parcourir la source

refactor: error handling; fix: http request handling (tested)

Silas Gruen il y a 9 mois
Parent
commit
54da7c9a73

+ 4 - 4
battery_measure_ctrl/config/config.yaml

@@ -1,5 +1,5 @@
 mqtt:
-  debug: true
+  debug: false
   broker_address: "localhost"
   port: 1883
   subscribe_prefix: "cells_inserted"
@@ -17,10 +17,10 @@ i2c:
   timeout_ms: 100  # Timeout for I2C communications
 
 http:
-  debug: true
-  server_url: "https://batteries.up-cell.de/cells/2224/"
+  debug: false
+  server_url: "https://batteries.up-cell.de"
   timeout: 5
-  endpoint: "/cells"
+  endpoint: "cells"
   username: "test"
   password: "123"
 

+ 2 - 3
battery_measure_ctrl/requirements.txt

@@ -1,10 +1,9 @@
 paho-mqtt==1.6.1
 smbus2==0.4.2
-smbus==1.1.post2
 aiohttp==3.8.5
 numpy==1.24.3
 pyyaml==6.0.1
 pytest==7.4.0
 pytest-asyncio==0.21.1
-Flask==2.3.3
-requests==2.32.3
+requests==2.32.3
+pydantic==2.27.2

+ 63 - 75
battery_measure_ctrl/src/controllers/measurement_controller.py

@@ -2,11 +2,11 @@ import asyncio
 import logging
 from typing import Dict
 import json
-from models.cell import Cell, MeasureValues
+from models.cell import Cell, CellLimits
 from models.device import Device, DeviceStatus, Slot
 from services.i2c_service import I2CService
 from services.http_service import HTTPService
-from services.mqtt_service import MQTTService
+from services.mqtt_service import MQTTService, InsertedCell
 
 logger = logging.getLogger(__name__)
 
@@ -27,12 +27,7 @@ class MeasurementController:
         
         self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
         for device in self.devices:
-            self.setup_mqtt_subscription(device.id)
-
-    def setup_mqtt_subscription(self, device_id):
-        """Setup MQTT subscriptions for each device."""
-        topic = f"{self.subscribe_prefix}/device_{device_id}"
-        self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id))
+            self.mqtt_service.register_device(device.id, len(device.slots), self._update_inserted_cell)
 
     async def start_polling(self):
         """Start the polling tasks."""
@@ -49,72 +44,68 @@ class MeasurementController:
         
         while True:
             await asyncio.sleep(polling_interval)
-            try:
-                for device in self.devices:
-                    # Read slot status via I2C
-                    new_status_list = await self.i2c_service.request_status_list(device.i2c_address)
-                    if len(device.status_list) != len(device.slots):
-                        raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
+            for device in self.devices:
+                    try:
+                        # Read slot status via I2C
+                        new_status_list = self.i2c_service.request_status_list(device.i2c_address)
+                        if len(device.status_list) != len(device.slots):
+                            raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
+                    except Exception as e:
+                        logger.error(f"Error during polling device: {device.id}:\n{str(e)}")
+                        continue
                     
                     # Change the (change of) status for each slot and act accordingly
                     for idx, status in enumerate(new_status_list):
-                        slot = device.slots[idx]
-                        prev_state = device.status_list[idx]
-                        device.status_list[idx] = status
-                        
-                        # Check for state transitions to "INSERTED"
-                        if status is DeviceStatus.INSERTED and prev_state is not DeviceStatus.INSERTED:
-                            self._update_cell_limits(device, slot)
-                            continue
-                        # Check for state transitions to "DONE"
-                        if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
-                            self._process_done(device, slot)
-                            continue
-                        # Check for state transitions to "ERROR"
-                        if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
-                            logger.error(f"Error detected for device {device.id}, slot {slot.id}")
-                            continue
-                        if status is DeviceStatus.MEASURING:
-                            self._collect_measurement(device, slot)
-                            continue
+                        try:
+                            slot = device.slots[idx]
+                            prev_state = device.status_list[idx]
+                            device.status_list[idx] = status
+                            
+                            # Check for unconfigured cell                  
+                            if status is DeviceStatus.INSERTED and slot.get_cell() and not slot.get_cell().limits_transmitted:
+                                self._update_cell_limits(device, slot)
+                                continue
+                            # Check for state transitions to "DONE"
+                            if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
+                                self._process_done(device, slot)
+                                continue
+                            # Check for state transitions to "ERROR"
+                            if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
+                                self._process_error(device, slot)
+                                continue
+                            if status is DeviceStatus.MEASURING:
+                                self._collect_measurement(device, slot)
+                                continue
 
-            except Exception as e:
-                logger.error(f"Error during device polling: {str(e)}")                
-            
-    async def _update_cell_limits(self, device: Device, slot: Slot):
+                        except Exception as e:
+                            logger.error(f"Error during processing device: {device.id}), slot: {slot.id}:\n{str(e)}")    
+                            continue            
+                        
+    def _update_cell_limits(self, device: Device, slot: Slot):
         """Send battery limits to the device."""
-        cell_id = slot.get_cell().id
-        limits = self.http_service.fetch_cell_info(cell_id)
-        self.i2c_service.send_cell_limits(device.i2c_address, slot, limits)
+        cell = slot.get_cell()
+        if cell is None:
+            raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
+        self.i2c_service.send_cell_limits(device.i2c_address, slot.id, cell.limits)
+        cell.limits_transmitted = True
 
-    async def _collect_measurement(self, device: Device, slot: Slot):
+    def _collect_measurement(self, device: Device, slot: Slot):
         """Collect measurement data from active slots."""
-        try:
-            measure_values = await self.i2c_service.request_measure_values(device.i2c_address, slot.id)  
-            slot.get_cell().add_measurement(measure_values)
-                    
-        except Exception as e:
-            logger.error(f"Error collecting measurement data: {str(e)}")            
-
-    def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
-        """Handle MQTT message for cell insertion."""
-        try:
-            data = json.loads(message.payload)
-            slot = data.get('slot')
-            cell_id = data.get('cell_id')
-            
-            if slot is None or cell_id is None:
-                logger.error(f"Invalid message format: {message.payload}")
-                return
+        measure_values = self.i2c_service.request_measure_values(device.i2c_address, slot.id)
+        cell = slot.get_cell()
+        if cell is None:
+            raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")  
+        cell.add_measurement(measure_values)
 
-            # Create and schedule the measurement task
-            self._start_measurement(device_id, slot, cell_id)
-            logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
-            
-        except json.JSONDecodeError:
-            logger.error(f"Invalid JSON in MQTT message: {message}")
-        except Exception as e:
-            logger.error(f"Error handling cell insertion: {str(e)}")
+    def _update_inserted_cell(self, insertion_info: InsertedCell):
+        """Update the inserted cell id for a device."""
+        cell_info = self.http_service.fetch_cell_info(insertion_info.cell_id)
+        min_volt = max(cell_info['cell_type']['min_voltage'], self.config['measurement']['min_voltage'])
+        max_volt = min(cell_info['cell_type']['max_voltage'], self.config['measurement']['max_voltage'])
+        max_current = cell_info['cell_type']['capacity'] * self.config['measurement']['c_rate']
+        limits = CellLimits(min_volt, max_volt, max_current)
+        nom_capacity = cell_info['cell_type']['capacity']
+        self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, nom_capacity))
         
     def _process_done(self, device: Device, slot: Slot):
         """Execute measurement cycles for a Cell."""
@@ -124,15 +115,12 @@ class MeasurementController:
         estimated_capacity = cell.estimate_capacity()
         
         logger.info(f"Measurement complete for cell {cell.id}. Estimated capacity: {estimated_capacity}%")
+        self.mqtt_service.cell_finished(device.id, slot.id, cell.id, estimated_capacity, DeviceStatus.DONE)
+        slot.remove_cell()
 
-        # Publish completion message
-        topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device.id}/slot_{slot}"
-        self.mqtt_service.publish(topic, json.dumps({
-            "cell_id": cell.id,
-            "device_id": device.id,
-            "slot_id": slot.id,
-            "capacity": estimated_capacity,
-            "status": DeviceStatus.DONE.name
-        }))
+    def _process_error(self, device: Device, slot: Slot):
+        """Handle errors during measurement."""
+        logger.error(f"Error detected for device {device.id}, slot {slot.id}")
+        self.mqtt_service.cell_finished(device.id, slot.id, slot.get_cell().id, 0.0, DeviceStatus.ERROR)
         slot.remove_cell()
                 

+ 1 - 4
battery_measure_ctrl/src/main.py

@@ -39,9 +39,6 @@ async def main():
     http_service = HTTPService(config)
     controller = MeasurementController(config, i2c_service, http_service, mqtt_service)
 
-    mqtt_service.connect()
-    mqtt_service.loop_start()
-
     await controller.start_polling()
 
     try:
@@ -50,7 +47,7 @@ async def main():
             await asyncio.sleep(1)
     finally:
         logging.info("Shutting down...")
-        mqtt_service.disconnect()
+        mqtt_service.cleanup()
 
 if __name__ == "__main__":
     asyncio.run(main())

+ 3 - 2
battery_measure_ctrl/src/models/cell.py

@@ -17,9 +17,10 @@ class CellLimits:
 
 class Cell():
     
-    def __init__(self, id: int, min_volt: float, max_volt: float, max_curr: float, nom_capacity: float, estimated_health: float=-1.0):
+    def __init__(self, id: int, cell_limits: CellLimits, nom_capacity: float, estimated_health: float=-1.0):
         self.id = id
-        self.limits = CellLimits(min_volt, max_volt, max_curr)
+        self.limits = cell_limits
+        self.limits_transmitted = False
         self.nom_capacity = nom_capacity
         self.estimated_health = estimated_health # -1.0 indicates unknown health
         self.measurements = []

+ 6 - 7
battery_measure_ctrl/src/services/http_service.py

@@ -1,5 +1,3 @@
-import json
-from models.cell import CellLimits
 import requests
 import logging
 
@@ -34,17 +32,19 @@ class HTTPService:
     def __init__(self, config: dict):
         self.config = config
         self.debug = config['http'].get('debug', False)
-        self.base_url = config['http'].get('url')
+        self.base_url = config['http'].get('server_url')
         self.endpoint = config['http'].get('endpoint')
+        self.username = config['http'].get('username')
+        self.password = config['http'].get('password')
 
-    def fetch_cell_info(self, cell_id, username, password):      
+    def fetch_cell_info(self, cell_id):      
         if self.debug:
             return DEBUG_DATA
         
         url = f"{self.base_url}/{self.endpoint}/{cell_id}/"
         
         # Basic Authentication (if required)
-        auth = (username, password)
+        auth = (self.username, self.password)
         
         # Headers
         headers = {
@@ -58,5 +58,4 @@ class HTTPService:
         if response.status_code == 200:
             return response.json()  # Return parsed JSON
         else:
-            logger.error(f"Request failed with status {response.status_code}")
-            return None
+            raise ConnectionError(f"Info for cell {cell_id} could not be retreived: {response.status_code}")

+ 16 - 4
battery_measure_ctrl/src/services/i2c_service.py

@@ -1,7 +1,9 @@
 import smbus2
-import time
 from models.cell import CellLimits, MeasureValues
 from models.device import DeviceStatus
+import logging
+
+logger = logging.getLogger(__name__)
 
 class I2CService:
     status_register = 0x01
@@ -11,23 +13,33 @@ class I2CService:
     def __init__(self, config: dict):
         self.config = config
         self.debug = config['i2c'].get('debug', False)
+        self.bus = None
         if not self.debug:
             bus_number = config.get('i2c', {}).get('bus', 1)
             self.bus = smbus2.SMBus(bus_number)
     
-    async def request_status_list(self, i2c_adress: int) -> list[DeviceStatus]:
+    def request_status_list(self, i2c_adress: int) -> list[DeviceStatus]:
         """Request the status of a all slots."""
+        if self.debug:
+            return [DeviceStatus.INSERTED, DeviceStatus.EMPTY, DeviceStatus.EMPTY, DeviceStatus.EMPTY, DeviceStatus.EMPTY]
 
         status_list = self.bus.read_block_data(i2c_adress, self.status_register)
+        logger.debug(f"Received status list: {status_list} (i2c_adress: {i2c_adress})")
         return [DeviceStatus(value) for value in status_list[:8]]
 
-    async def request_measure_values(self, i2c_adress: int, slot_id: int) -> MeasureValues:
+    def request_measure_values(self, i2c_adress: int, slot_id: int) -> MeasureValues:
         """Request the cell values of a specific slot."""
+        if self.debug:
+            return MeasureValues(4.2, 3.6, 1.5)
         measure_values:MeasureValues = self.bus.read_block_data(i2c_adress, self.cell_data_register, slot_id) # TODO [SG]: How do i specify the slot?
+        logger.debug(f"Received measure values: {measure_values} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return measure_values
     
-    async def send_cell_limits(self, i2c_adress: int, slot_id: int, limits: CellLimits) -> bool:
+    def send_cell_limits(self, i2c_adress: int, slot_id: int, limits: CellLimits) -> bool:
         """Send the battery limits to the device."""
+        if self.debug:
+            return True
         limit_list = [slot_id, limits.min_volt, limits.max_volt, limits.max_curr]
         self.bus.write_block_data(i2c_adress, self.status_register, limit_list)
+        logger.debug(f"Sent cell limits {limit_list} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return True

+ 76 - 49
battery_measure_ctrl/src/services/mqtt_service.py

@@ -1,74 +1,101 @@
 import logging
 import paho.mqtt.client as mqtt
+from pydantic import BaseModel
 import json
 from typing import Dict, Callable
+from models.device import DeviceStatus
 
 logger = logging.getLogger(__name__)
 
+class InsertedCell(BaseModel):
+    device_id: int
+    slot_id: int
+    cell_id: int    
+
 class MQTTService:
     def __init__(self, config: dict):
         self.config = config
-        # self.debug = config['mqtt'].get('debug', False)
-        self.broker_address = config['mqtt']['broker_address']
-        self.username = config['mqtt'].get('username')
-        self.password = config['mqtt'].get('password')
-        self.publish_topics = []
-        self.message_handlers: Dict[str, Callable] = {}
+        broker_address = config['mqtt']['broker_address']
+        port = config['mqtt']['port']
+        keepalive = config['mqtt']['keepalive']
+        username = config['mqtt']['username']
+        password = config['mqtt']['password']
         
-        # Initialize MQTT client with credentials
         self.client = mqtt.Client()
-        if self.username and self.password:
-            self.client.username_pw_set(self.username, self.password)
+        self.client.username_pw_set(username, password)
         self.client.on_connect = self.on_connect
+        self.client.on_message = self.on_message
+        
+        if broker_address == "debug":
+            self.client.connect("test.mosquitto.org", 1883)
+            return        
 
-    def connect(self):
-        try:
-            self.client.connect(self.broker_address)
-        except Exception as e:
-            logger.error(f"Failed to connect to MQTT broker: {str(e)}")
-            raise
+        self.devices: dict[int: int] = {}
+        self.insertion_callbacks: Dict[str, Dict[int, Callable]] = {}
+        
+        self.client.connect(broker_address, port, keepalive)
+        self.client.loop_start()
 
-    def disconnect(self):
-        self.client.disconnect()
+    def register_device(self, device_id, num_slots, callback: Callable = None):
+        """Register a new device to handle"""
+        self.devices[device_id] = num_slots
+        self.insertion_callbacks[device_id] = {}
+        if callback:
+            for slot in range(num_slots):
+                self.insertion_callbacks[device_id][slot] = callback
+
+    def _subscribe_device_topics(self, device_id: str):
+        """Subscribe to all topics for a specific device"""
+        topics = [
+            f"cells_inserted/device_{device_id}",
+        ]
+        for topic in topics:
+            self.client.subscribe(topic)
+            logger.info(f"Subscribed to {topic}")
 
     def on_connect(self, client, userdata, flags, rc):
         if rc == 0:
-            logger.info("Successfully connected to MQTT broker")
-            self.subscribe_defined()
-        elif rc == 1:
-            logger.error("Connection refused - incorrect protocol version")
-        elif rc == 2:
-            logger.error("Connection refused - invalid client identifier")
-        elif rc == 3:
-            logger.error("Connection refused - server unavailable")
-        elif rc == 4:
-            logger.error("Connection refused - bad username or password")
-        elif rc == 5:
-            logger.error("Connection refused - not authorised")
+            logger.info("Connected to MQTT Broker!")
         else:
-            logger.error(f"Connection failed with code {rc}")
+            raise ConnectionError(f"Failed to connect, return code {rc}")
 
-    def add_message_handler(self, topic: str, handler: Callable):
-        self.message_handlers[topic] = handler
+        # Resubscribe to all device topics on reconnect
+        for device_id in self.devices.keys():
+            self._subscribe_device_topics(device_id)
 
-    def subscribe_defined(self):
-        for topic, callback in self.message_handlers.items():
-            logger.debug(f"Subscribing to MQTT topic: {topic}")
-            self.client.subscribe(topic)
-            self.client.message_callback_add(topic, callback)
+    def on_message(self, client, userdata, msg):
+        try:
+            payload = json.loads(msg.payload.decode())
+            topic = msg.topic
+            device_id = int(topic.split('/')[1].split('_')[1])  # Extract device_id number from topic
 
-    def publish(self, topic: str, message: str):
-        # if self.debug:
-        #     logger.info(f"Debug MQTT publish to {topic}: {message}")
-        #     return
-        if topic in self.publish_topics:
-            logger.info(f"MQTT publish to {topic}: {message}")
-            self.client.publish(topic, json.dumps(message))
-        else:
-            raise ValueError(f"Topic {topic} not in configured publish topics")
+            inserted_cell = InsertedCell(device_id=device_id, **payload)
+            logger.info(f"Cell inserted: {inserted_cell}")
+            if device_id in self.insertion_callbacks and inserted_cell.slot_id in self.insertion_callbacks[device_id]:
+                self.insertion_callbacks[device_id][inserted_cell.slot_id](inserted_cell)
+            else:
+                logger.warning(f"No callback for insertion {inserted_cell}")
+                
+        except Exception as e:
+            logger.error(f"Error processing MQTT message: {e}")
 
-    def loop_start(self):
-        self.client.loop_start()
+    def cell_finished(self, device_id: str, slot_id: int, cell_id: int, capacity: float, status: DeviceStatus):
+        """Publish a message for a cell finishing measurement"""
+        if device_id not in self.devices:
+            raise ValueError(f"Device {device_id} not registered")
+        
+        topic = f"measurement_done/{device_id}"
+        payload = {
+            "device_id": device_id,
+            "slot_id": slot_id,
+            "cell_id": cell_id,
+            "capacity": capacity,
+            "status": status
+        }
+        self.client.publish(topic, json.dumps(payload))
+        logger.info(f"MQTT msg published for {topic}: {payload}")
 
-    def loop_stop(self):
+    def cleanup(self):
+        """Cleanup MQTT connection"""
         self.client.loop_stop()
+        self.client.disconnect()