measurement_controller.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import asyncio
  2. import logging
  3. from typing import Dict, List
  4. from datetime import datetime
  5. import json
  6. from models.cell import Cell
  7. from models.device import Device
  8. from services.i2c_service import I2CService, DeviceStatus
  9. from services.http_service import HTTPService
  10. from services.mqtt_service import MQTTService
  11. logger = logging.getLogger(__name__)
  12. class MeasurementController:
  13. """Controls the measurement process for multiple devices and slots."""
  14. def __init__(self, config: dict, i2c_service: I2CService, http_service: HTTPService, mqtt_service: MQTTService):
  15. self.config = config
  16. self.i2c_service = i2c_service
  17. self.http_service = http_service
  18. self.mqtt_service = mqtt_service
  19. self.polling_task = None
  20. self.measurement_data_task = None
  21. self.slot_states = {} # Track states of all slots
  22. self.active_measurements: Dict[str, Dict[int, asyncio.Task]] = {}
  23. devices_config = config['devices']
  24. self.devices = {id: Device(id, config) for id, config in enumerate(devices_config)}
  25. self.subscribe_prefix = self.config['mqtt']['subscribe_prefix']
  26. for device_id in self.devices.keys():
  27. self.setup_mqtt_subscription(device_id)
  28. def setup_mqtt_subscription(self, device_id):
  29. """Setup MQTT subscriptions for each device."""
  30. topic = f"{self.subscribe_prefix}/device_{device_id}"
  31. self.mqtt_service.add_message_handler(topic, lambda client, userdata, msg, dev_id=device_id: self._handle_cell_insertion(client, userdata, msg, dev_id))
  32. async def start_polling(self):
  33. """Start the polling tasks."""
  34. self.polling_task = asyncio.create_task(self._poll_devices())
  35. self.measurement_data_task = asyncio.create_task(self._collect_measurement_data())
  36. async def stop_polling(self):
  37. """Stop the polling tasks."""
  38. if self.polling_task:
  39. self.polling_task.cancel()
  40. if self.measurement_data_task:
  41. self.measurement_data_task.cancel()
  42. async def _poll_devices(self):
  43. """Continuously poll all devices for slot status."""
  44. polling_interval = self.config['i2c']['polling_interval_ms'] / 1000.0 # Convert to seconds
  45. while True:
  46. await asyncio.sleep(polling_interval)
  47. try:
  48. for device_id, device in self.devices.items():
  49. # Read slot status via I2C
  50. status_list = await self.i2c_service.request_status_list(device_id, slot)
  51. for idx, status in enumerate(status_list):
  52. slot = device.slots[idx]
  53. prev_state = self.slot_states.get((device_id, slot))
  54. self.slot_states[(device_id, slot)] = status
  55. # Check for state transitions to "Done"
  56. if prev_state is DeviceStatus.MEASURING and status is DeviceStatus.DONE:
  57. self._process_done(device_id, slot)
  58. continue
  59. if status is DeviceStatus.MEASURING:
  60. self._process_measurement(device_id, slot)
  61. continue
  62. if status is DeviceStatus.ERROR and prev_state is not DeviceStatus.ERROR:
  63. logger.error(f"Error detected for device {device_id}, slot {slot}")
  64. continue
  65. except Exception as e:
  66. logger.error(f"Error during device polling: {str(e)}")
  67. async def _collect_measurement_data(self):
  68. """Collect measurement data from active slots."""
  69. measurement_interval = self.config['i2c']['measurement_data_interval_ms'] / 1000.0 # Convert to seconds
  70. while True:
  71. try:
  72. for (device_id, slot), status in self.slot_states.items():
  73. if status == "MEASURING":
  74. # Collect measurement data
  75. voltage = await self.i2c_service.read_voltage(device_id, slot)
  76. current = await self.i2c_service.read_current(device_id, slot)
  77. temp = await self.i2c_service.read_temperature(device_id, slot)
  78. # Store or process the measurement data
  79. await self._process_measurement_data(device_id, slot, {
  80. "voltage": voltage,
  81. "current": current,
  82. "temperature": temp,
  83. "timestamp": datetime.now().isoformat()
  84. })
  85. except Exception as e:
  86. logger.error(f"Error collecting measurement data: {str(e)}")
  87. await asyncio.sleep(measurement_interval)
  88. async def _process_measurement_data(self, device_id: int, slot: int, data: dict):
  89. """Process measurement data - implement your data handling logic here."""
  90. # Add to measurements list, save to database, etc.
  91. pass
  92. def _handle_cell_insertion(self, client, userdata, message: str, device_id: int):
  93. """Handle MQTT message for cell insertion."""
  94. try:
  95. data = json.loads(message.payload)
  96. slot = data.get('slot')
  97. cell_id = data.get('cell_id')
  98. if slot is None or cell_id is None:
  99. logger.error(f"Invalid message format: {message.payload}")
  100. return
  101. # Create and schedule the measurement task
  102. self.start_measurement(device_id, slot, cell_id)
  103. logger.info(f"Initiated measurement for device {device_id}, slot {slot}, cell {cell_id}")
  104. except json.JSONDecodeError:
  105. logger.error(f"Invalid JSON in MQTT message: {message}")
  106. except Exception as e:
  107. logger.error(f"Error handling cell insertion: {str(e)}")
  108. def start_measurement(self, device_id: int, slot: int, cell_id: int):
  109. """Start measurement cycle for a specific slot."""
  110. if device_id not in self.active_measurements:
  111. self.active_measurements[device_id] = {}
  112. # Cancel existing measurement if any
  113. if slot in self.active_measurements[device_id]:
  114. self.active_measurements[device_id][slot].cancel()
  115. logger.info(f"Starting measurement for device {device_id}, slot {slot}, cell {cell_id}")
  116. def _process_done(self, device_id: str, slot: int, cell_id: int):
  117. """Execute measurement cycles for a Cell."""
  118. # Calculate health
  119. estimated_capacity = Cell.estimate_capacity([m['current'] for m in measurements])
  120. logger.info(f"Measurement complete for cell {cell_id}. Estimated capacity: {estimated_capacity}%")
  121. # Publish completion message
  122. topic = f"{self.config['mqtt']['measurement_done_topic']}/device_{device_id}/slot_{slot}"
  123. self.mqtt_service.publish(topic, json.dumps({
  124. "cell_id": device.get_cell_id(slot),
  125. "device_id": device_id,
  126. "slot_id": slot.id,
  127. "capacity": device.get_capacity(slot),
  128. "status": DeviceStatus.DONE.name
  129. }))
  130. try:
  131. # Get Cell info from HTTP service
  132. cell_info = await self.http_service.get_cell_info(cell_id)
  133. measurements = []
  134. cycles = self.config['measurement']['cycles']
  135. sample_rate = self.config['measurement']['sample_rate_hz']
  136. except Exception as e:
  137. logger.error(f"Error during measurement: {str(e)}")
  138. finally:
  139. # Cleanup
  140. if device_id in self.active_measurements and \
  141. slot in self.active_measurements[device_id]:
  142. del self.active_measurements[device_id][slot]