i2c_playground.py 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import smbus2
  2. def main():
  3. # Initialize configuration
  4. config = {
  5. 'i2c': {
  6. 'bus': 1,
  7. 'debug': False
  8. }
  9. }
  10. # Create I2C service instance
  11. i2c_service = I2CService(config)
  12. # Test parameters
  13. i2c_address = 0x20
  14. num_slots = 4
  15. try:
  16. # Request status list from the device
  17. # status_list = i2c_service.request_status_list(i2c_address, num_slots)
  18. # status_list = i2c_service.request_measure_values(i2c_address, 0)
  19. status_list = i2c_service.send_cell_limits(i2c_address, 0, [2500, 4200, 25, 500, 75, 0])
  20. print(f"Status list received: {status_list}")
  21. except Exception as e:
  22. print(f"Error occurred: {str(e)}")
  23. class I2CService:
  24. status_register = 0x01
  25. cell_data_register = 0x02
  26. battery_limit_register = 0x03
  27. def __init__(self, config: dict):
  28. self.config = config
  29. bus_number = config.get('i2c', {}).get('bus', 1)
  30. self.bus = smbus2.SMBus(bus_number)
  31. def request_status_list(self, i2c_adress: int, num_slots: int):
  32. """Request the status of a all slots."""
  33. print(f"Requesting status list (i2c_adress: {i2c_adress}, register: {self.status_register})")
  34. status_list = self.bus.read_i2c_block_data(i2c_adress, self.status_register, num_slots)
  35. print(f"Received status list: {status_list} (i2c_adress: {i2c_adress})")
  36. return status_list
  37. def request_measure_values(self, i2c_adress: int, slot_id: int) -> list[int]:
  38. """Request the cell values of a specific slot.
  39. """
  40. print(f"Requesting measure values (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
  41. slot_request = slot_id << 4 | self.cell_data_register
  42. response = self.bus.read_i2c_block_data(i2c_adress, slot_request, 8)
  43. print(f"Received response: {response} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
  44. voltage = int.from_bytes(response[0:2], byteorder='little')
  45. current = int.from_bytes(response[2:4], byteorder='little')
  46. temperature = int.from_bytes(response[4:6], byteorder='little')
  47. cycle_num = int.from_bytes(response[6:7], byteorder='little')
  48. cycle_state = int.from_bytes(response[7:8], byteorder='little')
  49. print(f"Decoded response: voltage={voltage}, current={current}, temperature={temperature}, cycle_num={cycle_num}, cycle_state={cycle_state}")
  50. return response
  51. def send_cell_limits(self, i2c_adress: int, slot_id: int, limits: list[int]) -> bool:
  52. """Send the battery limits to the device.
  53. Args:
  54. i2c_adress: Device address
  55. slot_id: Target slot (0-15)
  56. limits: [min_volts (uint16), max_volts (int16), charge_fraction (uint8), capacity (uint16), cut_off_current (uint8), cycle_number (uint8)]
  57. """
  58. msg_bytes = bytearray()
  59. slot_request = slot_id << 4 | self.battery_limit_register
  60. # min_volts: uint16
  61. msg_bytes.extend(limits[0].to_bytes(2, byteorder='little', signed=False))
  62. # max_volts: int16
  63. msg_bytes.extend(limits[1].to_bytes(2, byteorder='little', signed=True))
  64. # charge_fraction: uint8
  65. msg_bytes.append(limits[2] & 0xFF) # Ensure it's within 0-255
  66. # capacity: uint16
  67. msg_bytes.extend(limits[3].to_bytes(2, byteorder='little', signed=False))
  68. # cut_off_current: uint8
  69. msg_bytes.append(limits[4] & 0xFF)
  70. # cycle_number: uint8
  71. msg_bytes.append(limits[5] & 0xFF)
  72. self.bus.write_i2c_block_data(i2c_adress, slot_request, msg_bytes)
  73. print(f"Sent cell limits (bytes): {list(msg_bytes)} (i2c_adress: {i2c_adress}, slot_id: {slot_id})")
  74. return True
  75. if __name__ == "__main__":
  76. main()