| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import asyncio
- import logging
- from models.cell import Cell, CellLimits
- from models.device import Device, DeviceStatus, Slot
- from services.i2c_service import I2CService
- from services.http_service import HTTPService
- from services.mqtt_service import MQTTService, InsertedCell
- logger = logging.getLogger(__name__)
- class MeasurementController:
- """Controls the measurement process for multiple devices and slots."""
-
- def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService):
- self.config = config
- self.i2c_service = i2c_service
- self.http_service = http_service
- self.mqtt_service = mqtt_service
- self.polling_task = None
- self.measurement_data_task = None
- devices_config = config['devices']
- self.devices = {conf['id']: Device(conf['id'], conf) for conf in devices_config}
-
- self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
- for _, device in self.devices.items():
- self.mqtt_service.register_device(device.id, len(device.slots), self._update_inserted_cell)
- async def start_polling(self):
- """Start polling task."""
- self.polling_task = asyncio.create_task(self._poll_devices())
- async def stop_polling(self):
- """Stop polling task."""
- if self.polling_task:
- self.polling_task.cancel()
-
- async def _poll_devices(self):
- """Continuously poll all devices for slot status."""
- polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds
-
- while True:
- await asyncio.sleep(polling_interval)
- for device_id, device in self.devices.items():
- try:
- # Read slot status via I2C
- new_status_list = self.i2c_service.request_status_list(device.i2c_address, len(device.slots))
- if len(new_status_list) != len(device.slots):
- raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
- except Exception as e:
- logger.error(f"Error during polling device: {device.id}:\n{str(e)}")
- continue
-
- # Change the (change of) status for each slot and act accordingly
- for idx, status in enumerate(new_status_list):
- try:
- slot = device.slots[idx]
- prev_state = device.status_list[idx]
- device.status_list[idx] = status
- if slot.is_empty() and status is not DeviceStatus.EMPTY:
- logging.warning(f"Device {device.id}, Slot {slot.id} is empty, but status is {status}")
- continue
-
- # Check for unconfigured cell
- if status is DeviceStatus.INSERTED and not slot.get_cell().limits_transmitted:
- self._update_cell_limits(device, slot)
- continue
- # Check for state transitions to "DONE"
- if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
- self._process_done(device, slot)
- continue
- # Check for state transitions to "ERROR"
- if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
- self._process_error(device, slot)
- continue
- if status is DeviceStatus.MEASURING:
- self._collect_measurement(device, slot)
- continue
- except Exception as e:
- logger.error(f"Error during processing status {status} for device {device.id}, slot {slot.id}: {str(e)}")
- continue
-
- def _update_cell_limits(self, device: Device, slot: Slot):
- """Send battery limits to the device."""
- cell = slot.get_cell()
- if cell is None:
- raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
- self.i2c_service.send_cell_limits(device.i2c_address, slot.id, cell.limits)
- cell.limits_transmitted = True
- def _collect_measurement(self, device: Device, slot: Slot):
- """Collect measurement data from active slots."""
- measure_values = self.i2c_service.request_measure_values(device.i2c_address, slot.id)
- cell = slot.get_cell()
- if cell is None:
- raise ValueError(f"No cell inserted in device {device.id}, slot {slot.id}")
- cell.add_measurement(measure_values)
- def _update_inserted_cell(self, insertion_info: InsertedCell):
- """Update the inserted cell id for a device."""
- cell_info = self.http_service.fetch_cell_info(insertion_info.cell_id)
- min_volt = max(cell_info['cell_type']['min_voltage'], self.config['measurement']['min_voltage'])
- max_volt = min(cell_info['cell_type']['max_voltage'], self.config['measurement']['max_voltage'])
- max_current = cell_info['cell_type']['capacity'] * self.config['measurement']['c_rate']
- limits = CellLimits(min_volt, max_volt, max_current)
- nom_capacity = cell_info['cell_type']['capacity']
- self.devices[insertion_info.device_id].slots[insertion_info.slot_id].insert_cell(Cell(insertion_info.cell_id, limits, nom_capacity))
-
- def _process_done(self, device: Device, slot: Slot):
- """Execute measurement cycles for a Cell."""
- cell = slot.get_cell()
- # Calculate health and capacity
- estimated_capacity = cell.estimate_capacity()
-
- logger.info(f"Measurement complete for cell {cell.id}. Estimated capacity: {estimated_capacity}%")
- self.mqtt_service.cell_finished(device.id, slot.id, cell.id, estimated_capacity, DeviceStatus.DONE)
- slot.remove_cell()
- def _process_error(self, device: Device, slot: Slot):
- """Handle errors during measurement."""
- logger.error(f"Error detected for device {device.id}, slot {slot.id}")
- self.mqtt_service.cell_finished(device.id, slot.id, slot.get_cell().id, 0.0, DeviceStatus.ERROR)
- slot.remove_cell()
-
|