Parcourir la source

feat: adapt I2C to new API. Add cycle_num to new measurement

Silas Gruen il y a 6 mois
Parent
commit
a3dc0076b5

+ 2 - 1
config/config.yaml

@@ -17,7 +17,7 @@ i2c:
   timeout_ms: 100  # Timeout for I2C communications
 
 http:
-  debug: false
+  debug: true
   server_url: "https://batteries.up-cell.de"
   timeout: 5
   endpoint: "cells"
@@ -27,6 +27,7 @@ http:
 measurement:
   cycles: 3
   c_rate: 0.25
+  cut_off_curr: 100
   sample_rate_hz: 1
   min_voltage: 2.5
   max_voltage: 4.2

+ 18 - 10
src/controllers/measurement_controller.py

@@ -65,13 +65,16 @@ class MeasurementController:
                                 logging.warning(f"Device {device.id}, Slot {slot.id} is empty, but status is {status}")
                                 continue
                             
-                            # Check for unconfigured cell                  
-                            if status is DeviceStatus.INSERTED and not slot.get_cell().limits_transmitted:
+                            # Check for unconfigured cell (could also be when the device resets)    
+                            cell = slot.get_cell()  
+                            if status is DeviceStatus.INSERTED and cell and not cell.limits_transmitted:
                                 self._update_cell_limits(device, slot)
+                                cell.limits_transmitted = True
                                 continue
                             # Check for state transitions to "DONE"
-                            if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
+                            if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE and cell:
                                 self._process_done(device, slot)
+                                cell.limits_transmitted = False
                                 continue
                             # Check for state transitions to "ERROR"
                             if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
@@ -91,7 +94,6 @@ class MeasurementController:
         if cell is None:
             raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
         self.i2c_service.send_cell_limits(device.i2c_address, slot.id, cell.limits)
-        cell.limits_transmitted = True
 
     def _collect_measurement(self, device: Device, slot: Slot):
         """Collect measurement data from active slots."""
@@ -106,15 +108,19 @@ class MeasurementController:
         cell_info = self.http_service.fetch_cell_info(insertion_info.cell_id)
         min_volt = max(cell_info['cell_type']['min_voltage'], self.config['measurement']['min_voltage'])
         max_volt = min(cell_info['cell_type']['max_voltage'], self.config['measurement']['max_voltage'])
-        max_current = cell_info['cell_type']['capacity'] * self.config['measurement']['c_rate']
-        limits = CellLimits(min_volt, max_volt, max_current)
-        nom_capacity = cell_info['cell_type']['capacity']
-        self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, nom_capacity))
+        charge_fraction = self.config['measurement']['c_rate']
+        capacity = cell_info['cell_type']['capacity']
+        cut_off_curr = self.config['measurement']['cut_off_curr']
+        limits = CellLimits(min_volt, max_volt, charge_fraction, capacity, cut_off_curr, 0)
+        self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, capacity))
         
     def _process_done(self, device: Device, slot: Slot):
         """Execute measurement cycles for a Cell."""
 
         cell = slot.get_cell()
+        if cell is None:
+            raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
+        
         # Calculate health and capacity
         estimated_capacity = cell.estimate_capacity()
         
@@ -124,7 +130,9 @@ class MeasurementController:
 
     def _process_error(self, device: Device, slot: Slot):
         """Handle errors during measurement."""
-        logger.error(f"Error detected for device {device.id}, slot {slot.id}")
-        self.mqtt_service.cell_finished(device.id, slot.id, slot.get_cell().id, 0.0, DeviceStatus.ERROR)
+        cell = slot.get_cell()
+        cell_id = cell.id if cell else -1
+        logger.error(f"Error detected for device {device.id}, slot {slot.id}, cell {cell_id}")
+        self.mqtt_service.cell_finished(device.id, slot.id, cell_id, 0.0, DeviceStatus.ERROR)
         slot.remove_cell()
                 

+ 8 - 2
src/models/cell.py

@@ -10,12 +10,17 @@ class MeasureValues:
     voltage: int
     current: int
     temperature: int
+    cycle_num: int
+    cycle_state: int
 
 @dataclass
 class CellLimits:
     min_volt: int
     max_volt: int
-    max_curr: int
+    charge_fraction: int
+    capacity: int
+    cut_off_curr: int
+    cycle_num: int
 
 class Cell():
     
@@ -32,8 +37,9 @@ class Cell():
 
     def add_measurement(self, data: MeasureValues):
         """
-        Add a new measurement to the list of measurements and update Prometheus metrics.
+        Add a new measurement to the list of measurements and update Prometheus metrics. Update the cycle number in limits.
         """
+        self.limits.cycle_num = data.cycle_num
         now = datetime.now()
         if self.last_measured_time is not None:
             duration = (now - self.last_measured_time).total_seconds()

+ 4 - 3
src/models/device.py

@@ -1,5 +1,6 @@
 from .cell import Cell
 from enum import Enum
+from typing import Optional
 
 class DeviceStatus(Enum):
     EMPTY = 0
@@ -17,7 +18,7 @@ class Device():
         :param slots: Number of slots available in the device.
         """
         self.id: int = id
-        self.i2c_address: str = config['i2c_address']
+        self.i2c_address: int = config['i2c_address']
         self.slots = [Slot(idx) for idx in range(config['num_slots'])]
         self.status_list = [DeviceStatus.EMPTY for _ in range(len(self.slots))]
     
@@ -27,7 +28,7 @@ class Device():
 class Slot():
     def __init__(self, id: int):
         self.id = id
-        self.curr_cell: Cell = None
+        self.curr_cell: Optional[Cell] = None
 
     def insert_cell(self, cell: Cell):
         self.curr_cell = cell
@@ -38,5 +39,5 @@ class Slot():
     def is_empty(self) -> bool:
         return self.curr_cell is None
 
-    def get_cell(self) -> Cell:
+    def get_cell(self) -> Optional[Cell]:
         return self.curr_cell

+ 25 - 6
src/services/i2c_service.py

@@ -22,6 +22,8 @@ class I2CService:
         """Request the status of a all slots."""
         if self.debug:
             return [DeviceStatus.EMPTY for _ in range(num_slots)]
+        if not self.bus:
+            raise RuntimeError("I2C bus is not initialized. Check if debug mode is enabled.")
 
         logger.debug(f"Requesting status list (i2c_adress: {i2c_adress}, register: {self.status_register})")
         status_list = self.bus.read_i2c_block_data(i2c_adress, self.status_register, num_slots)
@@ -31,9 +33,20 @@ class I2CService:
     def request_measure_values(self, i2c_adress: int, slot_id: int) -> MeasureValues:
         """Request the cell values of a specific slot."""
         if self.debug:
-            return MeasureValues(4.2, 3.6, 1.5)
-        response = self.bus.read_i2c_block_data(i2c_adress, self.cell_data_register, 3, slot_id) # TODO [SG]: How do i specify the slot?
-        measure_values = MeasureValues(*response)
+            return MeasureValues(4, 2, 20, 1, 1)
+        if not self.bus:
+            raise RuntimeError("I2C bus is not initialized. Check if debug mode is enabled.")
+
+        # Use the same slot addressing as in i2c_playground.py
+        slot_request = self.cell_data_register << 4 | slot_id
+        response = self.bus.read_i2c_block_data(i2c_adress, slot_request, 8)
+        # Unpack values as in playground
+        voltage = int.from_bytes(response[0:2], byteorder='little')
+        current = int.from_bytes(response[2:4], byteorder='little')
+        temperature = int.from_bytes(response[4:6], byteorder='little')
+        cycle_num = int.from_bytes(response[6:7], byteorder='little')
+        cycle_state = int.from_bytes(response[7:8], byteorder='little')
+        measure_values = MeasureValues(voltage, current, temperature, cycle_num, cycle_state)
         logger.debug(f"Received measure values: {measure_values} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return measure_values
     
@@ -41,7 +54,13 @@ class I2CService:
         """Send the battery limits to the device."""
         if self.debug:
             return True
-        limit_list = [slot_id, limits.min_volt, limits.max_volt, limits.max_curr]
-        self.bus.write_block_data(i2c_adress, self.status_register, limit_list)
-        logger.debug(f"Sent cell limits {limit_list} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
+        if not self.bus:
+            raise RuntimeError("I2C bus is not initialized. Check if debug mode is enabled.")
+
+        # Pack limits as 16-bit values, little-endian, as in playground
+        msg_bytes = bytearray([slot_id])
+        for value in [limits.min_volt, limits.max_volt, limits.charge_fraction, limits.capacity, limits.cut_off_curr, limits.cycle_num]:
+            msg_bytes.extend(value.to_bytes(2, byteorder='little'))
+        self.bus.write_i2c_block_data(i2c_adress, self.battery_limit_register, msg_bytes)
+        logger.debug(f"Sent cell limits (bytes): {list(msg_bytes)} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return True

+ 5 - 5
src/services/mqtt_service.py

@@ -2,7 +2,7 @@ import logging
 import paho.mqtt.client as mqtt
 from pydantic import BaseModel
 import json
-from typing import Dict, Callable
+from typing import Dict, Callable, Optional
 from src.models.device import DeviceStatus
 
 logger = logging.getLogger(__name__)
@@ -27,7 +27,7 @@ class MQTTService:
         self.client.on_connect = self.on_connect
         self.client.on_message = self.on_message
 
-        self.devices: dict[int: int] = {}
+        self.devices: dict[int, int] = {}
         self.insertion_callbacks: Dict[str, Dict[int, Callable]] = {}
 
         if debug:
@@ -41,7 +41,7 @@ class MQTTService:
         self.client.connect(broker_address, port, keepalive)
         self.client.loop_start()
 
-    def register_device(self, device_id, num_slots, callback: Callable = None):
+    def register_device(self, device_id, num_slots, callback: Optional[Callable] = None):
         """Register a new device to handle"""
         self.devices[device_id] = num_slots
         self.insertion_callbacks[device_id] = {}
@@ -49,7 +49,7 @@ class MQTTService:
             for slot in range(num_slots):
                 self.insertion_callbacks[device_id][slot] = callback
 
-    def _subscribe_device_topics(self, device_id: str):
+    def _subscribe_device_topics(self, device_id: int):
         """Subscribe to all topics for a specific device"""
         topics = [
             f"cells_inserted/device_{device_id}",
@@ -84,7 +84,7 @@ class MQTTService:
         except Exception as e:
             logger.error(f"Error processing MQTT message: {e}")
 
-    def cell_finished(self, device_id: str, slot_id: int, cell_id: int, capacity: float, status: DeviceStatus):
+    def cell_finished(self, device_id: int, slot_id: int, cell_id: int, capacity: float, status: DeviceStatus):
         """Publish a message for a cell finishing measurement"""
         if device_id not in self.devices:
             raise ValueError(f"Device {device_id} not registered")