import asyncio import logging from typing import Dict, List from datetime import datetime import json from models.cell import Cell from models.device import Device from services.i2c_service import I2CService, DeviceStatus from services.http_service import HTTPService from services.mqtt_service import MQTTService logger = logging.getLogger(__name__) class MeasurementController: """Controls the measurement process for multiple devices and slots.""" def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService): self.config = config self.i2c_service = i2c_service self.http_service = http_service self.mqtt_service = mqtt_service self.polling_task = None self.measurement_data_task = None self.slot_states = {} # Track states of all slots self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {} devices_config = config['devices'] self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)} self.subscribe_prefix = self.config['mqtt']['subscribe_prefix'] for device_id in self.devices.keys(): self.setup_mqtt_subscription(device_id) def setup_mqtt_subscription(self, device_id): """Setup MQTT subscriptions for each device.""" topic = f"{self.subscribe_prefix}/device_{device_id}" self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id)) async def start_polling(self): """Start the polling tasks.""" self.polling_task = asyncio.create_task(self._poll_devices()) self.measurement_data_task = asyncio.create_task(self._collect_measurement_data()) async def stop_polling(self): """Stop the polling tasks.""" if self.polling_task: self.polling_task.cancel() if self.measurement_data_task: self.measurement_data_task.cancel() async def _poll_devices(self): """Continuously poll all devices for slot status.""" polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds while True: await asyncio.sleep(polling_interval) try: for device_id, device in self.devices.items(): # Read slot status via I2C status_list = await self.i2c_service.request_status_list(device_id, slot) for idx, status in enumerate(status_list): slot = device.slots[idx] prev_state = self.slot_states.get((device_id, slot)) self.slot_states[(device_id, slot)] = status # Check for state transitions to "Done" if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE: self._process_done(device_id, slot) continue if status is DeviceStatus.MEASURING: self._process_measurement(device_id, slot) continue if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR: logger.error(f"Error detected for device {device_id}, slot {slot}") continue except Exception as e: logger.error(f"Error during device polling: {str(e)}") async def _collect_measurement_data(self): """Collect measurement data from active slots.""" measurement_interval = self.config['i2c']['measurement_data_interval_ms'] / 1000.0 # Convert to seconds while True: try: for (device_id, slot), status in self.slot_states.items(): if status == "MEASURING": # Collect measurement data voltage = await self.i2c_service.read_voltage(device_id, slot) current = await self.i2c_service.read_current(device_id, slot) temp = await self.i2c_service.read_temperature(device_id, slot) # Store or process the measurement data await self._process_measurement_data(device_id, slot, { "voltage": voltage, "current": current, "temperature": temp, "timestamp": datetime.now().isoformat() }) except Exception as e: logger.error(f"Error collecting measurement data: {str(e)}") await asyncio.sleep(measurement_interval) async def _process_measurement_data(self, device_id: int, slot: int, data: dict): """Process measurement data - implement your data handling logic here.""" # Add to measurements list, save to database, etc. pass def _handle_cell_insertion(self, client, userdata, message: str, device_id: int): """Handle MQTT message for cell insertion.""" try: data = json.loads(message.payload) slot = data.get('slot') cell_id = data.get('cell_id') if slot is None or cell_id is None: logger.error(f"Invalid message format: {message.payload}") return # Create and schedule the measurement task self.start_measurement(device_id, slot, cell_id) logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}") except json.JSONDecodeError: logger.error(f"Invalid JSON in MQTT message: {message}") except Exception as e: logger.error(f"Error handling cell insertion: {str(e)}") def start_measurement(self, device_id: int, slot: int, cell_id: int): """Start measurement cycle for a specific slot.""" if device_id not in self.active_measurements: self.active_measurements[device_id] = {} # Cancel existing measurement if any if slot in self.active_measurements[device_id]: self.active_measurements[device_id][slot].cancel() logger.info(f"Starting measurement for device {device_id}, slot {slot}, cell {cell_id}") def _process_done(self, device_id: str, slot: int, cell_id: int): """Execute measurement cycles for a Cell.""" # Calculate health estimated_capacity = Cell.estimate_capacity([m['current'] for m in measurements]) logger.info(f"Measurement complete for cell {cell_id}. Estimated capacity: {estimated_capacity}%") # Publish completion message topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device_id}/slot_{slot}" self.mqtt_service.publish(topic, json.dumps({ "cell_id": device.get_cell_id(slot), "device_id": device_id, "slot_id": slot.id, "capacity": device.get_capacity(slot), "status": DeviceStatus.DONE.name })) try: # Get Cell info from HTTP service cell_info = await self.http_service.get_cell_info(cell_id) measurements = [] cycles = self.config['measurement']['cycles'] sample_rate = self.config['measurement']['sample_rate_hz'] except Exception as e: logger.error(f"Error during measurement: {str(e)}") finally: # Cleanup if device_id in self.active_measurements and \ slot in self.active_measurements[device_id]: del self.active_measurements[device_id][slot]