| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- import asyncio
- import logging
- from typing import Dict, List
- from datetime import datetime
- import json
- from models.cell import Cell
- from models.device import Device
- from services.i2c_service import I2CService, DeviceStatus
- from services.http_service import HTTPService
- from services.mqtt_service import MQTTService
- logger = logging.getLogger(__name__)
- class MeasurementController:
- """Controls the measurement process for multiple devices and slots."""
-
- def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService):
- self.config = config
- self.i2c_service = i2c_service
- self.http_service = http_service
- self.mqtt_service = mqtt_service
- self.polling_task = None
- self.measurement_data_task = None
- self.slot_states = {} # Track states of all slots
- self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
- devices_config = config['devices']
- self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)}
-
- self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
- for device_id in self.devices.keys():
- self.setup_mqtt_subscription(device_id)
- def setup_mqtt_subscription(self, device_id):
- """Setup MQTT subscriptions for each device."""
- topic = f"{self.subscribe_prefix}/device_{device_id}"
- self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id))
- async def start_polling(self):
- """Start the polling tasks."""
- self.polling_task = asyncio.create_task(self._poll_devices())
- self.measurement_data_task = asyncio.create_task(self._collect_measurement_data())
- async def stop_polling(self):
- """Stop the polling tasks."""
- if self.polling_task:
- self.polling_task.cancel()
- if self.measurement_data_task:
- self.measurement_data_task.cancel()
-
- async def _poll_devices(self):
- """Continuously poll all devices for slot status."""
- polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds
-
- while True:
- await asyncio.sleep(polling_interval)
- try:
- for device_id, device in self.devices.items():
- # Read slot status via I2C
- status_list = await self.i2c_service.request_status_list(device_id, slot)
- for idx, status in enumerate(status_list):
- slot = device.slots[idx]
- prev_state = self.slot_states.get((device_id, slot))
- self.slot_states[(device_id, slot)] = status
-
- # Check for state transitions to "Done"
- if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
- self._process_done(device_id, slot)
- continue
- if status is DeviceStatus.MEASURING:
- self._process_measurement(device_id, slot)
- continue
- if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
- logger.error(f"Error detected for device {device_id}, slot {slot}")
- continue
- except Exception as e:
- logger.error(f"Error during device polling: {str(e)}")
-
-
- async def _collect_measurement_data(self):
- """Collect measurement data from active slots."""
- measurement_interval = self.config['i2c']['measurement_data_interval_ms'] / 1000.0 # Convert to seconds
-
- while True:
- try:
- for (device_id, slot), status in self.slot_states.items():
- if status == "MEASURING":
- # Collect measurement data
- voltage = await self.i2c_service.read_voltage(device_id, slot)
- current = await self.i2c_service.read_current(device_id, slot)
- temp = await self.i2c_service.read_temperature(device_id, slot)
-
- # Store or process the measurement data
- await self._process_measurement_data(device_id, slot, {
- "voltage": voltage,
- "current": current,
- "temperature": temp,
- "timestamp": datetime.now().isoformat()
- })
-
- except Exception as e:
- logger.error(f"Error collecting measurement data: {str(e)}")
-
- await asyncio.sleep(measurement_interval)
-
- async def _process_measurement_data(self, device_id: int, slot: int, data: dict):
- """Process measurement data - implement your data handling logic here."""
- # Add to measurements list, save to database, etc.
- pass
- def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
- """Handle MQTT message for cell insertion."""
- try:
- data = json.loads(message.payload)
- slot = data.get('slot')
- cell_id = data.get('cell_id')
-
- if slot is None or cell_id is None:
- logger.error(f"Invalid message format: {message.payload}")
- return
- # Create and schedule the measurement task
- self.start_measurement(device_id, slot, cell_id)
- logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
-
- except json.JSONDecodeError:
- logger.error(f"Invalid JSON in MQTT message: {message}")
- except Exception as e:
- logger.error(f"Error handling cell insertion: {str(e)}")
- def start_measurement(self, device_id: int, slot: int, cell_id: int):
- """Start measurement cycle for a specific slot."""
- if device_id not in self.active_measurements:
- self.active_measurements[device_id] = {}
-
- # Cancel existing measurement if any
- if slot in self.active_measurements[device_id]:
- self.active_measurements[device_id][slot].cancel()
-
- logger.info(f"Starting measurement for device {device_id}, slot {slot}, cell {cell_id}")
-
- def _process_done(self, device_id: str, slot: int, cell_id: int):
- """Execute measurement cycles for a Cell."""
-
- # Calculate health
- estimated_capacity = Cell.estimate_capacity([m['current'] for m in measurements])
-
- logger.info(f"Measurement complete for cell {cell_id}. Estimated capacity: {estimated_capacity}%")
- # Publish completion message
- topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device_id}/slot_{slot}"
- self.mqtt_service.publish(topic, json.dumps({
- "cell_id": device.get_cell_id(slot),
- "device_id": device_id,
- "slot_id": slot.id,
- "capacity": device.get_capacity(slot),
- "status": DeviceStatus.DONE.name
- }))
-
- try:
- # Get Cell info from HTTP service
- cell_info = await self.http_service.get_cell_info(cell_id)
-
- measurements = []
- cycles = self.config['measurement']['cycles']
- sample_rate = self.config['measurement']['sample_rate_hz']
-
-
- except Exception as e:
- logger.error(f"Error during measurement: {str(e)}")
- finally:
- # Cleanup
- if device_id in self.active_measurements and \
- slot in self.active_measurements[device_id]:
- del self.active_measurements[device_id][slot]
-
|