Prechádzať zdrojové kódy

fix: update I2C service and playground for new API

Silas Gruen 6 mesiacov pred
rodič
commit
12965a5a56

+ 1 - 1
config/config.yaml

@@ -12,7 +12,7 @@ mqtt:
 i2c:
   debug: false
   bus_number: 1
-  polling_interval_ms: 100  # How often to poll devices
+  polling_interval_ms: 100  # Pause between polling I2C devices
   retry_count: 3  # Number of retries for failed I2C communications # TODO implement
   timeout_ms: 100  # Timeout for I2C communications # TODO implement
 

+ 29 - 21
playgrounds/i2c_playground.py

@@ -13,14 +13,14 @@ def main():
     i2c_service = I2CService(config)
     
     # Test parameters
-    i2c_address = 0x48
-    num_slots = 1
+    i2c_address = 0x20
+    num_slots = 4
     
     try:
         # Request status list from the device
         # status_list = i2c_service.request_status_list(i2c_address, num_slots)
-        status_list = i2c_service.request_measure_values(i2c_address, 0)
-        # status_list = i2c_service.send_cell_limits(i2c_address, 0, [2500, 4200, 500, 2800, 200])
+        # status_list = i2c_service.request_measure_values(i2c_address, 0)
+        status_list = i2c_service.send_cell_limits(i2c_address, 0, [2500, 4200, 25, 500, 75, 0])
         print(f"Status list received: {status_list}")
         
     except Exception as e:
@@ -48,35 +48,43 @@ class I2CService:
         """Request the cell values of a specific slot.
         """
         print(f"Requesting measure values (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
-        slot_request = self.cell_data_register << 4 | slot_id
+        slot_request = slot_id << 4 | self.cell_data_register
         response = self.bus.read_i2c_block_data(i2c_adress, slot_request, 8)
         
         print(f"Received response: {response} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
-        # slot = int.from_bytes(response[0], byteorder='little')
-        slot = response[0]
-        voltage = int.from_bytes(response[2:4], byteorder='little')
-        current = int.from_bytes(response[4:6], byteorder='little')
-        temperature = int.from_bytes(response[6:8], byteorder='little')
-        print(f"Decoded values: slot={slot}, voltage={voltage}, current={current}, temperature={temperature}")
+        voltage = int.from_bytes(response[0:2], byteorder='little')
+        current = int.from_bytes(response[2:4], byteorder='little')
+        temperature = int.from_bytes(response[4:6], byteorder='little')
+        cycle_num = int.from_bytes(response[6:7], byteorder='little')
+        cycle_state = int.from_bytes(response[7:8], byteorder='little')
+        print(f"Decoded response: voltage={voltage}, current={current}, temperature={temperature}, cycle_num={cycle_num}, cycle_state={cycle_state}")
         return response
     
     def send_cell_limits(self, i2c_adress: int, slot_id: int, limits: list[int]) -> bool:
         """Send the battery limits to the device.
-        
+
         Args:
             i2c_adress: Device address
             slot_id: Target slot (0-15)
-            limits: List of 16-bit values [max_voltage, min_voltage, max_current, min_current, max_temp]
+            limits: [min_volts (uint16), max_volts (int16), charge_fraction (uint8), capacity (uint16), cut_off_current (uint8), cycle_number (uint8)]
         """
-        # Convert each 16-bit value to 2 bytes in little-endian order
-        msg_bytes = bytearray([slot_id])  # Start with slot_id
-        for value in limits:
-            # Convert each value to 2 bytes (16-bit) in little-endian order
-            value_bytes = value.to_bytes(2, byteorder='little')
-            msg_bytes.extend(value_bytes)
+        msg_bytes = bytearray()
+        slot_request = slot_id << 4 | self.battery_limit_register
+        # min_volts: uint16
+        msg_bytes.extend(limits[0].to_bytes(2, byteorder='little', signed=False))
+        # max_volts: int16
+        msg_bytes.extend(limits[1].to_bytes(2, byteorder='little', signed=True))
+        # charge_fraction: uint8
+        msg_bytes.append(limits[2] & 0xFF) # Ensure it's within 0-255
+        # capacity: uint16
+        msg_bytes.extend(limits[3].to_bytes(2, byteorder='little', signed=False))
+        # cut_off_current: uint8
+        msg_bytes.append(limits[4] & 0xFF)
+        # cycle_number: uint8
+        msg_bytes.append(limits[5] & 0xFF)
 
-        self.bus.write_i2c_block_data(i2c_adress, self.battery_limit_register, msg_bytes)
-        print(f"Sent msg (bytes): {list(msg_bytes)} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
+        self.bus.write_i2c_block_data(i2c_adress, slot_request, msg_bytes)
+        print(f"Sent cell limits (bytes): {list(msg_bytes)} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return True
 
 if __name__ == "__main__":

+ 1 - 1
src/controllers/measurement_controller.py

@@ -67,7 +67,7 @@ class MeasurementController:
                         
                         # Check for unconfigured cell (could also be when the device resets)    
                         cell = slot.get_cell()  
-                        if status is DeviceStatus.WAITING and cell and not cell.limits_transmitted:
+                        if status is DeviceStatus.INSERTED and cell and not cell.limits_transmitted:
                             self._update_cell_limits(device, slot)
                             cell.limits_transmitted = True
                             continue

+ 12 - 8
src/services/i2c_service.py

@@ -40,10 +40,9 @@ class I2CService:
         if not self.bus:
             raise RuntimeError("I2C bus is not initialized. Check if debug mode is enabled.")
 
-        # Use the same slot addressing as in i2c_playground.py
-        slot_request = self.cell_data_register << 4 | slot_id
+        slot_request = slot_id << 4 | self.cell_data_register
         response = self.bus.read_i2c_block_data(i2c_adress, slot_request, 8)
-        # Unpack values as in playground
+        # Unpack values
         voltage = int.from_bytes(response[0:2], byteorder='little')
         current = int.from_bytes(response[2:4], byteorder='little')
         temperature = int.from_bytes(response[4:6], byteorder='little')
@@ -60,10 +59,15 @@ class I2CService:
         if not self.bus:
             raise RuntimeError("I2C bus is not initialized. Check if debug mode is enabled.")
 
-        # Pack limits as 16-bit values, little-endian, as in playground
-        msg_bytes = bytearray([slot_id])
-        for value in [limits.min_volt, limits.max_volt, limits.charge_fraction, limits.capacity, limits.cut_off_curr, limits.cycle_num]:
-            msg_bytes.extend(value.to_bytes(2, byteorder='little'))
-        self.bus.write_i2c_block_data(i2c_adress, self.battery_limit_register, msg_bytes)
+        msg_bytes = bytearray()
+        request = slot_id << 4 | self.battery_limit_register
+        msg_bytes.extend(limits.min_volt.to_bytes(2, byteorder='little', signed=False)) # uint16
+        msg_bytes.extend(limits.max_volt.to_bytes(2, byteorder='little', signed=True)) # uint16
+        msg_bytes.append(limits.charge_fraction & 0xFF) # uint8, ensure it's within 0-255
+        msg_bytes.extend(limits.capacity.to_bytes(2, byteorder='little', signed=False)) # uint16
+        msg_bytes.append(limits.cut_off_curr & 0xFF) # uint8, ensure it's within 0-255
+        msg_bytes.append(limits.cycle_num & 0xFF) # uint8, ensure it's within 0-255
+
+        self.bus.write_i2c_block_data(i2c_adress, request, msg_bytes)
         logger.debug(f"Sent cell limits (bytes): {list(msg_bytes)} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
         return True