|
|
@@ -1,11 +1,10 @@
|
|
|
import asyncio
|
|
|
import logging
|
|
|
-from typing import Dict, List
|
|
|
-from datetime import datetime
|
|
|
+from typing import Dict
|
|
|
import json
|
|
|
-from models.cell import Cell
|
|
|
-from models.device import Device
|
|
|
-from services.i2c_service import I2CService, DeviceStatus
|
|
|
+from models.cell import Cell, MeasureValues
|
|
|
+from models.device import Device, DeviceStatus, Slot
|
|
|
+from services.i2c_service import I2CService
|
|
|
from services.http_service import HTTPService
|
|
|
from services.mqtt_service import MQTTService
|
|
|
|
|
|
@@ -22,16 +21,13 @@ class MeasurementController:
|
|
|
|
|
|
self.polling_task = None
|
|
|
self.measurement_data_task = None
|
|
|
- self.slot_states = {} # Track states of all slots
|
|
|
-
|
|
|
- self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
|
|
|
|
|
|
devices_config = config['devices']
|
|
|
- self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)}
|
|
|
+ self.devices = [Device(conf['id'], conf) for conf in devices_config]
|
|
|
|
|
|
self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
|
|
|
- for device_id in self.devices.keys():
|
|
|
- self.setup_mqtt_subscription(device_id)
|
|
|
+ for device in self.devices:
|
|
|
+ self.setup_mqtt_subscription(device.id)
|
|
|
|
|
|
def setup_mqtt_subscription(self, device_id):
|
|
|
"""Setup MQTT subscriptions for each device."""
|
|
|
@@ -41,14 +37,11 @@ class MeasurementController:
|
|
|
async def start_polling(self):
|
|
|
"""Start the polling tasks."""
|
|
|
self.polling_task = asyncio.create_task(self._poll_devices())
|
|
|
- self.measurement_data_task = asyncio.create_task(self._collect_measurement_data())
|
|
|
|
|
|
async def stop_polling(self):
|
|
|
"""Stop the polling tasks."""
|
|
|
if self.polling_task:
|
|
|
self.polling_task.cancel()
|
|
|
- if self.measurement_data_task:
|
|
|
- self.measurement_data_task.cancel()
|
|
|
|
|
|
async def _poll_devices(self):
|
|
|
"""Continuously poll all devices for slot status."""
|
|
|
@@ -57,59 +50,51 @@ class MeasurementController:
|
|
|
while True:
|
|
|
await asyncio.sleep(polling_interval)
|
|
|
try:
|
|
|
- for device_id, device in self.devices.items():
|
|
|
+ for device in self.devices:
|
|
|
# Read slot status via I2C
|
|
|
- status_list = await self.i2c_service.request_status_list(device_id, slot)
|
|
|
- for idx, status in enumerate(status_list):
|
|
|
+ new_status_list = await self.i2c_service.request_status_list(device.i2c_address)
|
|
|
+ if len(device.status_list) != len(device.slots):
|
|
|
+ raise IndexError(f"Invalid status list length: {len(device.status_list)} != {len(device.slots)}")
|
|
|
+
|
|
|
+ # Change the (change of) status for each slot and act accordingly
|
|
|
+ for idx, status in enumerate(new_status_list):
|
|
|
slot = device.slots[idx]
|
|
|
- prev_state = self.slot_states.get((device_id, slot))
|
|
|
- self.slot_states[(device_id, slot)] = status
|
|
|
+ prev_state = device.status_list[idx]
|
|
|
+ device.status_list[idx] = status
|
|
|
|
|
|
- # Check for state transitions to "Done"
|
|
|
- if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
|
|
|
- self._process_done(device_id, slot)
|
|
|
+ # Check for state transitions to "INSERTED"
|
|
|
+ if status is DeviceStatus.INSERTED and prev_state is not DeviceStatus.INSERTED:
|
|
|
+ self._update_cell_limits(device, slot)
|
|
|
continue
|
|
|
- if status is DeviceStatus.MEASURING:
|
|
|
- self._process_measurement(device_id, slot)
|
|
|
+ # Check for state transitions to "DONE"
|
|
|
+ if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
|
|
|
+ self._process_done(device, slot)
|
|
|
continue
|
|
|
+ # Check for state transitions to "ERROR"
|
|
|
if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
|
|
|
- logger.error(f"Error detected for device {device_id}, slot {slot}")
|
|
|
+ logger.error(f"Error detected for device {device.id}, slot {slot.id}")
|
|
|
+ continue
|
|
|
+ if status is DeviceStatus.MEASURING:
|
|
|
+ self._collect_measurement(device, slot)
|
|
|
continue
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error during device polling: {str(e)}")
|
|
|
-
|
|
|
+ logger.error(f"Error during device polling: {str(e)}")
|
|
|
|
|
|
- async def _collect_measurement_data(self):
|
|
|
+ async def _update_cell_limits(self, device: Device, slot: Slot):
|
|
|
+ """Send battery limits to the device."""
|
|
|
+ cell_id = slot.get_cell().id
|
|
|
+ limits = self.http_service.fetch_cell_info(cell_id)
|
|
|
+ self.i2c_service.send_cell_limits(device.i2c_address, slot, limits)
|
|
|
+
|
|
|
+ async def _collect_measurement(self, device: Device, slot: Slot):
|
|
|
"""Collect measurement data from active slots."""
|
|
|
- measurement_interval = self.config['i2c']['measurement_data_interval_ms'] / 1000.0 # Convert to seconds
|
|
|
-
|
|
|
- while True:
|
|
|
- try:
|
|
|
- for (device_id, slot), status in self.slot_states.items():
|
|
|
- if status == "MEASURING":
|
|
|
- # Collect measurement data
|
|
|
- voltage = await self.i2c_service.read_voltage(device_id, slot)
|
|
|
- current = await self.i2c_service.read_current(device_id, slot)
|
|
|
- temp = await self.i2c_service.read_temperature(device_id, slot)
|
|
|
-
|
|
|
- # Store or process the measurement data
|
|
|
- await self._process_measurement_data(device_id, slot, {
|
|
|
- "voltage": voltage,
|
|
|
- "current": current,
|
|
|
- "temperature": temp,
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- })
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error collecting measurement data: {str(e)}")
|
|
|
-
|
|
|
- await asyncio.sleep(measurement_interval)
|
|
|
-
|
|
|
- async def _process_measurement_data(self, device_id: int, slot: int, data: dict):
|
|
|
- """Process measurement data - implement your data handling logic here."""
|
|
|
- # Add to measurements list, save to database, etc.
|
|
|
- pass
|
|
|
+ try:
|
|
|
+ measure_values = await self.i2c_service.request_measure_values(device.i2c_address, slot.id)
|
|
|
+ slot.get_cell().add_measurement(measure_values)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error collecting measurement data: {str(e)}")
|
|
|
|
|
|
def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
|
|
|
"""Handle MQTT message for cell insertion."""
|
|
|
@@ -123,58 +108,31 @@ class MeasurementController:
|
|
|
return
|
|
|
|
|
|
# Create and schedule the measurement task
|
|
|
- self.start_measurement(device_id, slot, cell_id)
|
|
|
+ self._start_measurement(device_id, slot, cell_id)
|
|
|
logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
logger.error(f"Invalid JSON in MQTT message: {message}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error handling cell insertion: {str(e)}")
|
|
|
-
|
|
|
- def start_measurement(self, device_id: int, slot: int, cell_id: int):
|
|
|
- """Start measurement cycle for a specific slot."""
|
|
|
- if device_id not in self.active_measurements:
|
|
|
- self.active_measurements[device_id] = {}
|
|
|
-
|
|
|
- # Cancel existing measurement if any
|
|
|
- if slot in self.active_measurements[device_id]:
|
|
|
- self.active_measurements[device_id][slot].cancel()
|
|
|
-
|
|
|
- logger.info(f"Starting measurement for device {device_id}, slot {slot}, cell {cell_id}")
|
|
|
|
|
|
- def _process_done(self, device_id: str, slot: int, cell_id: int):
|
|
|
+ def _process_done(self, device: Device, slot: Slot):
|
|
|
"""Execute measurement cycles for a Cell."""
|
|
|
|
|
|
+ cell = slot.get_cell()
|
|
|
+ # Calculate health and capacity
|
|
|
+ estimated_capacity = cell.estimate_capacity()
|
|
|
|
|
|
- # Calculate health
|
|
|
- estimated_capacity = Cell.estimate_capacity([m['current'] for m in measurements])
|
|
|
-
|
|
|
- logger.info(f"Measurement complete for cell {cell_id}. Estimated capacity: {estimated_capacity}%")
|
|
|
+ logger.info(f"Measurement complete for cell {cell.id}. Estimated capacity: {estimated_capacity}%")
|
|
|
|
|
|
# Publish completion message
|
|
|
- topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device_id}/slot_{slot}"
|
|
|
+ topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device.id}/slot_{slot}"
|
|
|
self.mqtt_service.publish(topic, json.dumps({
|
|
|
- "cell_id": device.get_cell_id(slot),
|
|
|
- "device_id": device_id,
|
|
|
+ "cell_id": cell.id,
|
|
|
+ "device_id": device.id,
|
|
|
"slot_id": slot.id,
|
|
|
- "capacity": device.get_capacity(slot),
|
|
|
+ "capacity": estimated_capacity,
|
|
|
"status": DeviceStatus.DONE.name
|
|
|
}))
|
|
|
-
|
|
|
- try:
|
|
|
- # Get Cell info from HTTP service
|
|
|
- cell_info = await self.http_service.get_cell_info(cell_id)
|
|
|
-
|
|
|
- measurements = []
|
|
|
- cycles = self.config['measurement']['cycles']
|
|
|
- sample_rate = self.config['measurement']['sample_rate_hz']
|
|
|
-
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error during measurement: {str(e)}")
|
|
|
- finally:
|
|
|
- # Cleanup
|
|
|
- if device_id in self.active_measurements and \
|
|
|
- slot in self.active_measurements[device_id]:
|
|
|
- del self.active_measurements[device_id][slot]
|
|
|
+ slot.remove_cell()
|
|
|
|