measurement_controller.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import asyncio
  2. import logging
  3. from models.cell import Cell, CellLimits
  4. from models.device import Device, DeviceStatus, Slot
  5. from services.i2c_service import I2CService
  6. from services.http_service import HTTPService
  7. from services.mqtt_service import MQTTService, InsertedCell
  8. logger = logging.getLogger(__name__)
  9. class MeasurementController:
  10. """Controls the measurement process for multiple devices and slots."""
  11. def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService):
  12. self.config = config
  13. self.i2c_service = i2c_service
  14. self.http_service = http_service
  15. self.mqtt_service = mqtt_service
  16. self.polling_task = None
  17. self.measurement_data_task = None
  18. devices_config = config['devices']
  19. self.devices = {conf['id']: Device(conf['id'], conf) for conf in devices_config}
  20. self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
  21. for _, device in self.devices.items():
  22. self.mqtt_service.register_device(device.id, len(device.slots), self._update_inserted_cell)
  23. async def start_polling(self):
  24. """Start polling task."""
  25. self.polling_task = asyncio.create_task(self._poll_devices())
  26. async def stop_polling(self):
  27. """Stop polling task."""
  28. if self.polling_task:
  29. self.polling_task.cancel()
  30. async def _poll_devices(self):
  31. """Continuously poll all devices for slot status."""
  32. polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds
  33. while True:
  34. await asyncio.sleep(polling_interval)
  35. for device_id, device in self.devices.items():
  36. try:
  37. # Read slot status via I2C
  38. new_status_list = self.i2c_service.request_status_list(device.i2c_address, len(device.slots))
  39. if len(new_status_list) != len(device.slots):
  40. raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
  41. except Exception as e:
  42. logger.error(f"Error during polling device: {device.id}:\n{str(e)}")
  43. continue
  44. # Change the (change of) status for each slot and act accordingly
  45. for idx, status in enumerate(new_status_list):
  46. try:
  47. slot = device.slots[idx]
  48. prev_state = device.status_list[idx]
  49. device.status_list[idx] = status
  50. if slot.is_empty() and status is not DeviceStatus.EMPTY:
  51. logging.warning(f"Device {device.id}, Slot {slot.id} is empty, but status is {status}")
  52. continue
  53. # Check for unconfigured cell
  54. if status is DeviceStatus.INSERTED and not slot.get_cell().limits_transmitted:
  55. self._update_cell_limits(device, slot)
  56. continue
  57. # Check for state transitions to "DONE"
  58. if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
  59. self._process_done(device, slot)
  60. continue
  61. # Check for state transitions to "ERROR"
  62. if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
  63. self._process_error(device, slot)
  64. continue
  65. if status is DeviceStatus.MEASURING:
  66. self._collect_measurement(device, slot)
  67. continue
  68. except Exception as e:
  69. logger.error(f"Error during processing status {status} for device {device.id}, slot {slot.id}: {str(e)}")
  70. continue
  71. def _update_cell_limits(self, device: Device, slot: Slot):
  72. """Send battery limits to the device."""
  73. cell = slot.get_cell()
  74. if cell is None:
  75. raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
  76. self.i2c_service.send_cell_limits(device.i2c_address, slot.id, cell.limits)
  77. cell.limits_transmitted = True
  78. def _collect_measurement(self, device: Device, slot: Slot):
  79. """Collect measurement data from active slots."""
  80. measure_values = self.i2c_service.request_measure_values(device.i2c_address, slot.id)
  81. cell = slot.get_cell()
  82. if cell is None:
  83. raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
  84. cell.add_measurement(measure_values)
  85. def _update_inserted_cell(self, insertion_info: InsertedCell):
  86. """Update the inserted cell id for a device."""
  87. cell_info = self.http_service.fetch_cell_info(insertion_info.cell_id)
  88. min_volt = max(cell_info['cell_type']['min_voltage'], self.config['measurement']['min_voltage'])
  89. max_volt = min(cell_info['cell_type']['max_voltage'], self.config['measurement']['max_voltage'])
  90. max_current = cell_info['cell_type']['capacity'] * self.config['measurement']['c_rate']
  91. limits = CellLimits(min_volt, max_volt, max_current)
  92. nom_capacity = cell_info['cell_type']['capacity']
  93. self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, nom_capacity))
  94. def _process_done(self, device: Device, slot: Slot):
  95. """Execute measurement cycles for a Cell."""
  96. cell = slot.get_cell()
  97. # Calculate health and capacity
  98. estimated_capacity = cell.estimate_capacity()
  99. logger.info(f"Measurement complete for cell {cell.id}. Estimated capacity: {estimated_capacity}%")
  100. self.mqtt_service.cell_finished(device.id, slot.id, cell.id, estimated_capacity, DeviceStatus.DONE)
  101. slot.remove_cell()
  102. def _process_error(self, device: Device, slot: Slot):
  103. """Handle errors during measurement."""
  104. logger.error(f"Error detected for device {device.id}, slot {slot.id}")
  105. self.mqtt_service.cell_finished(device.id, slot.id, slot.get_cell().id, 0.0, DeviceStatus.ERROR)
  106. slot.remove_cell()