import asyncio import logging from models.cell import Cell, CellLimits from models.device import Device, DeviceStatus, Slot from services.i2c_service import I2CService from services.http_service import HTTPService from services.mqtt_service import MQTTService, InsertedCell logger = logging.getLogger(__name__) class MeasurementController: """Controls the measurement process for multiple devices and slots.""" def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService): self.config = config self.i2c_service = i2c_service self.http_service = http_service self.mqtt_service = mqtt_service self.polling_task = None self.measurement_data_task = None devices_config = config['devices'] self.devices = {conf['id']: Device(conf['id'], conf) for conf in devices_config} self.subscribe_prefix = self.config['mqtt']['subscribe_prefix'] for _, device in self.devices.items(): self.mqtt_service.register_device(device.id, len(device.slots), self._update_inserted_cell) async def start_polling(self): """Start polling task.""" self.polling_task = asyncio.create_task(self._poll_devices()) async def stop_polling(self): """Stop polling task.""" if self.polling_task: self.polling_task.cancel() async def _poll_devices(self): """Continuously poll all devices for slot status.""" polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds while True: await asyncio.sleep(polling_interval) for device_id, device in self.devices.items(): try: # Read slot status via I2C new_status_list = self.i2c_service.request_status_list(device.i2c_address, len(device.slots)) if len(new_status_list) != len(device.slots): raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}") except Exception as e: logger.error(f"Error during polling device: {device.id}:\n{str(e)}") continue # Change the (change of) status for each slot and act accordingly for idx, status in enumerate(new_status_list): try: slot = device.slots[idx] prev_state = device.status_list[idx] device.status_list[idx] = status if slot.is_empty() and status is not DeviceStatus.EMPTY: logging.warning(f"Device {device.id}, Slot {slot.id} is empty, but status is {status}") continue # Check for unconfigured cell if status is DeviceStatus.INSERTED and not slot.get_cell().limits_transmitted: self._update_cell_limits(device, slot) continue # Check for state transitions to "DONE" if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE: self._process_done(device, slot) continue # Check for state transitions to "ERROR" if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR: self._process_error(device, slot) continue if status is DeviceStatus.MEASURING: self._collect_measurement(device, slot) continue except Exception as e: logger.error(f"Error during processing status {status} for device {device.id}, slot {slot.id}: {str(e)}") continue def _update_cell_limits(self, device: Device, slot: Slot): """Send battery limits to the device.""" cell = slot.get_cell() if cell is None: raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}") self.i2c_service.send_cell_limits(device.i2c_address, slot.id, cell.limits) cell.limits_transmitted = True def _collect_measurement(self, device: Device, slot: Slot): """Collect measurement data from active slots.""" measure_values = self.i2c_service.request_measure_values(device.i2c_address, slot.id) cell = slot.get_cell() if cell is None: raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}") cell.add_measurement(measure_values) def _update_inserted_cell(self, insertion_info: InsertedCell): """Update the inserted cell id for a device.""" cell_info = self.http_service.fetch_cell_info(insertion_info.cell_id) min_volt = max(cell_info['cell_type']['min_voltage'], self.config['measurement']['min_voltage']) max_volt = min(cell_info['cell_type']['max_voltage'], self.config['measurement']['max_voltage']) max_current = cell_info['cell_type']['capacity'] * self.config['measurement']['c_rate'] limits = CellLimits(min_volt, max_volt, max_current) nom_capacity = cell_info['cell_type']['capacity'] self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, nom_capacity)) def _process_done(self, device: Device, slot: Slot): """Execute measurement cycles for a Cell.""" cell = slot.get_cell() # Calculate health and capacity estimated_capacity = cell.estimate_capacity() logger.info(f"Measurement complete for cell {cell.id}. Estimated capacity: {estimated_capacity}%") self.mqtt_service.cell_finished(device.id, slot.id, cell.id, estimated_capacity, DeviceStatus.DONE) slot.remove_cell() def _process_error(self, device: Device, slot: Slot): """Handle errors during measurement.""" logger.error(f"Error detected for device {device.id}, slot {slot.id}") self.mqtt_service.cell_finished(device.id, slot.id, slot.get_cell().id, 0.0, DeviceStatus.ERROR) slot.remove_cell()