Procházet zdrojové kódy

feat: add capacity estimation and test; ref: roll back to dict for devices; add check for empty slot in polling

Silas Gruen před 9 měsíci
rodič
revize
7db2ce8c8a

+ 13 - 11
battery_measure_ctrl/src/controllers/measurement_controller.py

@@ -1,7 +1,5 @@
 import asyncio
 import logging
-from typing import Dict
-import json
 from models.cell import Cell, CellLimits
 from models.device import Device, DeviceStatus, Slot
 from services.i2c_service import I2CService
@@ -23,18 +21,18 @@ class MeasurementController:
         self.measurement_data_task = None
 
         devices_config = config['devices']
-        self.devices = [Device(conf['id'], conf) for conf in devices_config]
+        self.devices = {conf['id']: Device(conf['id'], conf) for conf in devices_config}
         
         self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
-        for device in self.devices:
+        for _, device in self.devices.items():
             self.mqtt_service.register_device(device.id, len(device.slots), self._update_inserted_cell)
 
     async def start_polling(self):
-        """Start the polling tasks."""
+        """Start polling task."""
         self.polling_task = asyncio.create_task(self._poll_devices())
 
     async def stop_polling(self):
-        """Stop the polling tasks."""
+        """Stop polling task."""
         if self.polling_task:
             self.polling_task.cancel()
             
@@ -44,11 +42,11 @@ class MeasurementController:
         
         while True:
             await asyncio.sleep(polling_interval)
-            for device in self.devices:
+            for device_id, device in self.devices.items():
                     try:
                         # Read slot status via I2C
-                        new_status_list = self.i2c_service.request_status_list(device.i2c_address)
-                        if len(device.status_list) != len(device.slots):
+                        new_status_list = self.i2c_service.request_status_list(device.i2c_address, len(device.slots))
+                        if len(new_status_list) != len(device.slots):
                             raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
                     except Exception as e:
                         logger.error(f"Error during polling device: {device.id}:\n{str(e)}")
@@ -60,9 +58,13 @@ class MeasurementController:
                             slot = device.slots[idx]
                             prev_state = device.status_list[idx]
                             device.status_list[idx] = status
+
+                            if slot.is_empty() and status is not DeviceStatus.EMPTY:
+                                logging.warning(f"Device {device.id}, Slot {slot.id} is empty, but status is {status}")
+                                continue
                             
                             # Check for unconfigured cell                  
-                            if status is DeviceStatus.INSERTED and slot.get_cell() and not slot.get_cell().limits_transmitted:
+                            if status is DeviceStatus.INSERTED and not slot.get_cell().limits_transmitted:
                                 self._update_cell_limits(device, slot)
                                 continue
                             # Check for state transitions to "DONE"
@@ -78,7 +80,7 @@ class MeasurementController:
                                 continue
 
                         except Exception as e:
-                            logger.error(f"Error during processing device: {device.id}), slot: {slot.id}:\n{str(e)}")    
+                            logger.error(f"Error during processing status {status} for device {device.id}, slot {slot.id}: {str(e)}")    
                             continue            
                         
     def _update_cell_limits(self, device: Device, slot: Slot):

+ 71 - 10
battery_measure_ctrl/src/models/cell.py

@@ -1,5 +1,6 @@
 import logging
 from dataclasses import dataclass
+from datetime import datetime
 
 logger = logging.getLogger(__name__)
 
@@ -23,24 +24,84 @@ class Cell():
         self.limits_transmitted = False
         self.nom_capacity = nom_capacity
         self.estimated_health = estimated_health # -1.0 indicates unknown health
-        self.measurements = []
+        self.measurements: list[MeasureValues] = []
+        self.measurements_duration: list[float] = []
+
+        self.last_measured_time = None
 
     def add_measurement(self, data: MeasureValues):
         """
         Add a new measurement to the list of measurements.
         """
-        self.measurements.append(data)
-        logger.debug(f"Added measurement for cell {self.id}: {data}")
+        now = datetime.now()
+        if self.last_measured_time is not None:
+            duration = (now - self.last_measured_time).total_seconds()
+            self.measurements_duration.append(duration)
+            self.measurements.append(data)
+            logger.debug(f"Added measurement for cell {self.id}: {data} ({duration:.1f}s)")
+        self.last_measured_time = datetime.now()
+
+    def _find_cycles(self) -> list[tuple[int, float]]:
+        """
+        Find charge cycles in measurements by detecting current direction changes.
+        Returns list of (start_idx, total_duration) tuples for positive current periods.
+        """
+        if not self.measurements:
+            return []
+
+        cycles = []
+        start_idx = None
+        current_duration = 0.0
+        
+        for i in range(len(self.measurements)):
+            current = self.measurements[i].current
+            
+            # Detect start of positive current (charging)
+            if start_idx is None and current > 0:
+                start_idx = i
+                current_duration = 0.0
+            # Accumulate duration during positive current
+            elif start_idx is not None and current > 0:
+                current_duration += self.measurements_duration[i]
+            # Detect end of positive current (charging)
+            elif start_idx is not None and current <= 0:
+                cycles.append((start_idx, current_duration))
+                start_idx = None
+                
+        # Handle case where last cycle is incomplete
+        if start_idx is not None:
+            cycles.append((start_idx, current_duration))
+            
+        return cycles
 
-    def estimate_capacity(self):
+    def estimate_capacity(self) -> float:
         """
-        Estimate and update the state of maximum capacity of the cell based on current measurements.
+        Estimate cell capacity based on charge cycle durations.
+        Returns estimated capacity as percentage of nominal capacity.
         """
-        # TODO [SG]: Placeholder for capacity estimation logic
         if not self.measurements:
             logger.warning("No measurements available for capacity estimation.")
-            return
+            return -1.0
+
+        cycles = self._find_cycles()
+        if not cycles:
+            logger.warning("No charge cycles detected for capacity estimation.")
+            return -1.0
+
+        # Calculate expected cycle duration in seconds
+        est_c_rate = max(m.current for m in self.measurements) / self.nom_capacity
+        expected_duration = 3600 / est_c_rate  # seconds for one cycle (1h/c_rate)
+
+        # Calculate average cycle duration from actual measured durations
+        actual_durations = [duration for _, duration in cycles]
+        avg_duration = sum(actual_durations) / len(actual_durations)
+
+        # Calculate capacity as ratio of actual vs expected duration
+        capacity_ratio = min(1.0, avg_duration / expected_duration)
+        estimated_capacity = capacity_ratio * 100.0
+
+        logger.info(f"Cell {self.id} capacity estimation: {estimated_capacity:.1f}% "
+                   f"(found {len(cycles)} cycles, avg duration: {avg_duration:.1f} seconds)")
         
-        avg_current = sum(self.measurements) / float(len(self.measurements))
-        self.capacity = max(0.0, min(100.0, (self.nom_capacity - avg_current) / self.nom_capacity * 100.0))
-        return self.capacity
+        self.estimated_health = estimated_capacity
+        return estimated_capacity

+ 3 - 3
battery_measure_ctrl/src/services/i2c_service.py

@@ -18,14 +18,14 @@ class I2CService:
             bus_number = config.get('i2c', {}).get('bus', 1)
             self.bus = smbus2.SMBus(bus_number)
     
-    def request_status_list(self, i2c_adress: int) -> list[DeviceStatus]:
+    def request_status_list(self, i2c_adress: int, num_slots: int) -> list[DeviceStatus]:
         """Request the status of a all slots."""
         if self.debug:
-            return [DeviceStatus.INSERTED, DeviceStatus.EMPTY, DeviceStatus.EMPTY, DeviceStatus.EMPTY, DeviceStatus.EMPTY]
+            return [DeviceStatus.EMPTY for _ in range(num_slots)]
 
         status_list = self.bus.read_block_data(i2c_adress, self.status_register)
         logger.debug(f"Received status list: {status_list} (i2c_adress: {i2c_adress})")
-        return [DeviceStatus(value) for value in status_list[:8]]
+        return [DeviceStatus(value) for value in status_list[:num_slots]]
 
     def request_measure_values(self, i2c_adress: int, slot_id: int) -> MeasureValues:
         """Request the cell values of a specific slot."""

+ 2 - 2
battery_measure_ctrl/src/services/mqtt_service.py

@@ -89,8 +89,8 @@ class MQTTService:
             "device_id": device_id,
             "slot_id": slot_id,
             "cell_id": cell_id,
-            "capacity": capacity,
-            "status": status
+            "capacity": round(capacity, 4),
+            "status": status.name
         }
         self.client.publish(topic, json.dumps(payload))
         logger.info(f"MQTT msg published for {topic}: {payload}")

+ 103 - 0
battery_measure_ctrl/tests/test_cell.py

@@ -0,0 +1,103 @@
+import pytest
+from datetime import datetime, timedelta
+from battery_measure_ctrl.src.models.cell import Cell, CellLimits, MeasureValues
+
+@pytest.fixture
+def cell_limits():
+    return CellLimits(min_volt=3000, max_volt=4200, max_curr=1000)
+
+@pytest.fixture
+def test_cell(cell_limits):
+    return Cell(id=1, cell_limits=cell_limits, nom_capacity=2000.0)  # 2000mAh capacity
+
+def generate_measurement_sequence(current_pattern: list[int], 
+                               interval_seconds: float = 1.0,
+                               voltage: int = 3700,
+                               temperature: int = 25):
+    """Helper to generate a sequence of measurements with specified timing"""
+    measurements = []
+    start_time = datetime.now()
+    last_time = start_time
+    
+    for current in current_pattern:
+        measurement = MeasureValues(voltage=voltage, 
+                                  current=current, 
+                                  temperature=temperature)
+        
+        # Set up timing for this measurement
+        current_time = last_time + timedelta(seconds=interval_seconds)
+        duration = (current_time - last_time).total_seconds()
+        
+        measurements.append((measurement, duration))
+        last_time = current_time
+        
+    return measurements
+
+def test_normal_cycles(test_cell:Cell):
+    """Test capacity estimation with normal charge/discharge cycles"""
+    # Create pattern for 2 complete cycles at 0.25C (500mA)
+    # Each charge should take ~4 hours = 14400 seconds
+    current_pattern = (
+        [500] * 14400 +    # 4h charge at 0.25C
+        [-500] * 14400 +   # 4h discharge
+        [500] * 14400 +    # 4h charge
+        [-500] * 14400     # 4h discharge
+    )
+    
+    # Add measurements
+    for measurement, duration in generate_measurement_sequence(current_pattern):
+        test_cell.measurements_duration.append(duration)
+        test_cell.measurements.append(measurement)
+
+    capacity = test_cell.estimate_capacity()
+    assert 95.0 <= capacity <= 100.0, "Expected near 100% capacity for normal cycles"
+
+def test_degraded_capacity(test_cell:Cell):
+    """Test capacity estimation with shorter charging cycles (degraded cell)"""
+    # Create pattern for 2 cycles at 0.25C but with shorter duration (~75% capacity)
+    current_pattern = (
+        [500] * 10800 +    # 3h charge at 0.25C
+        [-500] * 10800 +   # 3h discharge
+        [500] * 10800 +    # 3h charge
+        [-500] * 10800     # 3h discharge
+    )
+    
+    for measurement, duration in generate_measurement_sequence(current_pattern):
+        test_cell.measurements_duration.append(duration)
+        test_cell.measurements.append(measurement)
+
+    capacity = test_cell.estimate_capacity()
+    assert 70.0 <= capacity <= 80.0, "Expected ~75% capacity for degraded cell"
+
+def test_empty_measurements(test_cell:Cell):
+    """Test capacity estimation with no measurements"""
+    capacity = test_cell.estimate_capacity()
+    assert capacity == -1.0, "Expected -1.0 for no measurements"
+
+def test_no_charge_cycles(test_cell:Cell):
+    """Test capacity estimation with no charging cycles"""
+    # Only discharge cycles
+    current_pattern = [-500] * 1000
+    
+    for measurement, duration in generate_measurement_sequence(current_pattern):
+        test_cell.measurements_duration.append(duration)
+        test_cell.measurements.append(measurement)
+
+    capacity = test_cell.estimate_capacity()
+    assert capacity == -1.0, "Expected -1.0 for no charge cycles"
+
+def test_incomplete_cycle(test_cell:Cell):
+    """Test capacity estimation with incomplete cycle"""
+    # One complete cycle and one incomplete
+    current_pattern = (
+        [500] * 14400 +    # 4h charge
+        [-500] * 14400 +   # 4h discharge
+        [500] * 7200       # 2h partial charge
+    )
+    
+    for measurement, duration in generate_measurement_sequence(current_pattern):
+        test_cell.measurements_duration.append(duration)
+        test_cell.measurements.append(measurement)
+
+    capacity = test_cell.estimate_capacity()
+    assert 70.0 <= capacity <= 80.0, "Expected capacity calculation from incomplete cycle"