|
@@ -5,7 +5,7 @@ from datetime import datetime
|
|
|
import json
|
|
import json
|
|
|
from models.cell import Cell
|
|
from models.cell import Cell
|
|
|
from models.device import Device
|
|
from models.device import Device
|
|
|
-from services.i2c_service import I2CService
|
|
|
|
|
|
|
+from services.i2c_service import I2CService, DeviceStatus
|
|
|
from services.http_service import HTTPService
|
|
from services.http_service import HTTPService
|
|
|
from services.mqtt_service import MQTTService
|
|
from services.mqtt_service import MQTTService
|
|
|
|
|
|
|
@@ -20,20 +20,97 @@ class MeasurementController:
|
|
|
self.http_service = http_service
|
|
self.http_service = http_service
|
|
|
self.mqtt_service = mqtt_service
|
|
self.mqtt_service = mqtt_service
|
|
|
|
|
|
|
|
|
|
+ self.polling_task = None
|
|
|
|
|
+ self.measurement_data_task = None
|
|
|
|
|
+ self.slot_states = {} # Track states of all slots
|
|
|
|
|
+
|
|
|
self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
|
|
self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
|
|
|
|
|
|
|
|
devices_config = config['devices']
|
|
devices_config = config['devices']
|
|
|
self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)}
|
|
self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)}
|
|
|
|
|
|
|
|
- self.topic_prefix = self.config['mqtt']['topic_prefix']
|
|
|
|
|
|
|
+ self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
|
|
|
for device_id in self.devices.keys():
|
|
for device_id in self.devices.keys():
|
|
|
self.setup_mqtt_subscription(device_id)
|
|
self.setup_mqtt_subscription(device_id)
|
|
|
|
|
|
|
|
def setup_mqtt_subscription(self, device_id):
|
|
def setup_mqtt_subscription(self, device_id):
|
|
|
"""Setup MQTT subscriptions for each device."""
|
|
"""Setup MQTT subscriptions for each device."""
|
|
|
- topic = f"{self.topic_prefix}/{device_id}"
|
|
|
|
|
|
|
+ topic = f"{self.subscribe_prefix}/device_{device_id}"
|
|
|
self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id))
|
|
self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id))
|
|
|
|
|
|
|
|
|
|
+ async def start_polling(self):
|
|
|
|
|
+ """Start the polling tasks."""
|
|
|
|
|
+ self.polling_task = asyncio.create_task(self._poll_devices())
|
|
|
|
|
+ self.measurement_data_task = asyncio.create_task(self._collect_measurement_data())
|
|
|
|
|
+
|
|
|
|
|
+ async def stop_polling(self):
|
|
|
|
|
+ """Stop the polling tasks."""
|
|
|
|
|
+ if self.polling_task:
|
|
|
|
|
+ self.polling_task.cancel()
|
|
|
|
|
+ if self.measurement_data_task:
|
|
|
|
|
+ self.measurement_data_task.cancel()
|
|
|
|
|
+
|
|
|
|
|
+ async def _poll_devices(self):
|
|
|
|
|
+ """Continuously poll all devices for slot status."""
|
|
|
|
|
+ polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ await asyncio.sleep(polling_interval)
|
|
|
|
|
+ try:
|
|
|
|
|
+ for device_id, device in self.devices.items():
|
|
|
|
|
+ # Read slot status via I2C
|
|
|
|
|
+ status_list = await self.i2c_service.request_status_list(device_id, slot)
|
|
|
|
|
+ for idx, status in enumerate(status_list):
|
|
|
|
|
+ slot = device.slots[idx]
|
|
|
|
|
+ prev_state = self.slot_states.get((device_id, slot))
|
|
|
|
|
+ self.slot_states[(device_id, slot)] = status
|
|
|
|
|
+
|
|
|
|
|
+ # Check for state transitions to "Done"
|
|
|
|
|
+ if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
|
|
|
|
|
+ self._process_done(device_id, slot)
|
|
|
|
|
+ continue
|
|
|
|
|
+ if status is DeviceStatus.MEASURING:
|
|
|
|
|
+ self._process_measurement(device_id, slot)
|
|
|
|
|
+ continue
|
|
|
|
|
+ if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
|
|
|
|
|
+ logger.error(f"Error detected for device {device_id}, slot {slot}")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"Error during device polling: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ async def _collect_measurement_data(self):
|
|
|
|
|
+ """Collect measurement data from active slots."""
|
|
|
|
|
+ measurement_interval = self.config['i2c']['measurement_data_interval_ms'] / 1000.0 # Convert to seconds
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ try:
|
|
|
|
|
+ for (device_id, slot), status in self.slot_states.items():
|
|
|
|
|
+ if status == "MEASURING":
|
|
|
|
|
+ # Collect measurement data
|
|
|
|
|
+ voltage = await self.i2c_service.read_voltage(device_id, slot)
|
|
|
|
|
+ current = await self.i2c_service.read_current(device_id, slot)
|
|
|
|
|
+ temp = await self.i2c_service.read_temperature(device_id, slot)
|
|
|
|
|
+
|
|
|
|
|
+ # Store or process the measurement data
|
|
|
|
|
+ await self._process_measurement_data(device_id, slot, {
|
|
|
|
|
+ "voltage": voltage,
|
|
|
|
|
+ "current": current,
|
|
|
|
|
+ "temperature": temp,
|
|
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"Error collecting measurement data: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ await asyncio.sleep(measurement_interval)
|
|
|
|
|
+
|
|
|
|
|
+ async def _process_measurement_data(self, device_id: int, slot: int, data: dict):
|
|
|
|
|
+ """Process measurement data - implement your data handling logic here."""
|
|
|
|
|
+ # Add to measurements list, save to database, etc.
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
|
|
def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
|
|
|
"""Handle MQTT message for cell insertion."""
|
|
"""Handle MQTT message for cell insertion."""
|
|
|
try:
|
|
try:
|
|
@@ -46,7 +123,7 @@ class MeasurementController:
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
# Create and schedule the measurement task
|
|
# Create and schedule the measurement task
|
|
|
- asyncio.create_task(self.start_measurement(device_id, slot, cell_id))
|
|
|
|
|
|
|
+ self.start_measurement(device_id, slot, cell_id)
|
|
|
logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
|
|
logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
|
|
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
except json.JSONDecodeError:
|
|
@@ -54,7 +131,7 @@ class MeasurementController:
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"Error handling cell insertion: {str(e)}")
|
|
logger.error(f"Error handling cell insertion: {str(e)}")
|
|
|
|
|
|
|
|
- async def start_measurement(self, device_id: int, slot: int, cell_id: int):
|
|
|
|
|
|
|
+ def start_measurement(self, device_id: int, slot: int, cell_id: int):
|
|
|
"""Start measurement cycle for a specific slot."""
|
|
"""Start measurement cycle for a specific slot."""
|
|
|
if device_id not in self.active_measurements:
|
|
if device_id not in self.active_measurements:
|
|
|
self.active_measurements[device_id] = {}
|
|
self.active_measurements[device_id] = {}
|
|
@@ -63,14 +140,27 @@ class MeasurementController:
|
|
|
if slot in self.active_measurements[device_id]:
|
|
if slot in self.active_measurements[device_id]:
|
|
|
self.active_measurements[device_id][slot].cancel()
|
|
self.active_measurements[device_id][slot].cancel()
|
|
|
|
|
|
|
|
- # Create new measurement task
|
|
|
|
|
- task = asyncio.create_task(
|
|
|
|
|
- self._measure_cycle(device_id, slot, cell_id)
|
|
|
|
|
- )
|
|
|
|
|
- self.active_measurements[device_id][slot] = task
|
|
|
|
|
|
|
+ logger.info(f"Starting measurement for device {device_id}, slot {slot}, cell {cell_id}")
|
|
|
|
|
|
|
|
- async def _measure_cycle(self, device_id: str, slot: int, cell_id: int):
|
|
|
|
|
|
|
+ def _process_done(self, device_id: str, slot: int, cell_id: int):
|
|
|
"""Execute measurement cycles for a Cell."""
|
|
"""Execute measurement cycles for a Cell."""
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # Calculate health
|
|
|
|
|
+ estimated_capacity = Cell.estimate_capacity([m['current'] for m in measurements])
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"Measurement complete for cell {cell_id}. Estimated capacity: {estimated_capacity}%")
|
|
|
|
|
+
|
|
|
|
|
+ # Publish completion message
|
|
|
|
|
+ topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device_id}/slot_{slot}"
|
|
|
|
|
+ self.mqtt_service.publish(topic, json.dumps({
|
|
|
|
|
+ "cell_id": device.get_cell_id(slot),
|
|
|
|
|
+ "device_id": device_id,
|
|
|
|
|
+ "slot_id": slot.id,
|
|
|
|
|
+ "capacity": device.get_capacity(slot),
|
|
|
|
|
+ "status": DeviceStatus.DONE.name
|
|
|
|
|
+ }))
|
|
|
|
|
+
|
|
|
try:
|
|
try:
|
|
|
# Get Cell info from HTTP service
|
|
# Get Cell info from HTTP service
|
|
|
cell_info = await self.http_service.get_cell_info(cell_id)
|
|
cell_info = await self.http_service.get_cell_info(cell_id)
|
|
@@ -79,43 +169,7 @@ class MeasurementController:
|
|
|
cycles = self.config['measurement']['cycles']
|
|
cycles = self.config['measurement']['cycles']
|
|
|
sample_rate = self.config['measurement']['sample_rate_hz']
|
|
sample_rate = self.config['measurement']['sample_rate_hz']
|
|
|
|
|
|
|
|
- for cycle in range(cycles):
|
|
|
|
|
- logger.info(f"Starting cycle {cycle+1}/{cycles} for cell {cell_id}")
|
|
|
|
|
- cycle_measurements = []
|
|
|
|
|
-
|
|
|
|
|
- while True:
|
|
|
|
|
- # Read measurements
|
|
|
|
|
- voltage = await self.i2c_service.read_voltage(device_id, slot)
|
|
|
|
|
- current = await self.i2c_service.read_current(device_id, slot)
|
|
|
|
|
- temp = await self.i2c_service.read_temperature(device_id, slot)
|
|
|
|
|
-
|
|
|
|
|
- # Check safety limits
|
|
|
|
|
- if not self._check_safety_limits(voltage, temp):
|
|
|
|
|
- raise Exception("Safety limits exceeded")
|
|
|
|
|
-
|
|
|
|
|
- cycle_measurements.append({
|
|
|
|
|
- 'timestamp': datetime.now().isoformat(),
|
|
|
|
|
- 'voltage': voltage,
|
|
|
|
|
- 'current': current,
|
|
|
|
|
- 'temperature': temp
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- # Check if cycle complete
|
|
|
|
|
- if self._is_cycle_complete(cycle_measurements):
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- await asyncio.sleep(1/sample_rate)
|
|
|
|
|
-
|
|
|
|
|
- measurements.extend(cycle_measurements)
|
|
|
|
|
-
|
|
|
|
|
- # Rest period between cycles
|
|
|
|
|
- await asyncio.sleep(self.config['measurement']['rest_time_minutes'] * 60)
|
|
|
|
|
-
|
|
|
|
|
- # Calculate health
|
|
|
|
|
- health = calculate_health([m['current'] for m in measurements])
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"Measurement complete for cell {cell_id}. Health: {health}%")
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"Error during measurement: {str(e)}")
|
|
logger.error(f"Error during measurement: {str(e)}")
|
|
|
finally:
|
|
finally:
|
|
@@ -123,18 +177,4 @@ class MeasurementController:
|
|
|
if device_id in self.active_measurements and \
|
|
if device_id in self.active_measurements and \
|
|
|
slot in self.active_measurements[device_id]:
|
|
slot in self.active_measurements[device_id]:
|
|
|
del self.active_measurements[device_id][slot]
|
|
del self.active_measurements[device_id][slot]
|
|
|
-
|
|
|
|
|
- def _check_safety_limits(self, voltage: float, temperature: float) -> bool:
|
|
|
|
|
- """Check if measurements are within safety limits."""
|
|
|
|
|
- return (self.config['measurement']['min_voltage'] <= voltage <=
|
|
|
|
|
- self.config['measurement']['max_voltage'] and
|
|
|
|
|
- temperature <= self.config['measurement']['max_temperature_c'])
|
|
|
|
|
-
|
|
|
|
|
- def _is_cycle_complete(self, measurements: List[dict]) -> bool:
|
|
|
|
|
- """Determine if a measurement cycle is complete."""
|
|
|
|
|
- if not measurements:
|
|
|
|
|
- return False
|
|
|
|
|
-
|
|
|
|
|
- # TODO [SG]: Add cycle completion logic here
|
|
|
|
|
- # Multiple compelte cycles or first cycle looks bad
|
|
|
|
|
- return len(measurements) > 100 # Simplified example
|
|
|
|
|
|
|
+
|