test_measurement_controller.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import pytest
  2. from unittest.mock import MagicMock, patch, ANY
  3. import asyncio
  4. from src.controllers.measurement_controller import MeasurementController
  5. from src.models.device import DeviceStatus
  6. from src.models.cell import Cell, CellLimits, MeasureValues
  7. from src.services.mqtt_service import InsertedCell
  8. import logging
  9. @pytest.fixture
  10. def mock_services():
  11. i2c_service = MagicMock()
  12. http_service = MagicMock()
  13. mqtt_service = MagicMock()
  14. return i2c_service, http_service, mqtt_service
  15. @pytest.fixture
  16. def config():
  17. return {
  18. 'devices': [
  19. {
  20. 'id': 1,
  21. 'i2c_address': 0x48,
  22. 'num_slots': 2
  23. }
  24. ],
  25. 'mqtt': {
  26. 'subscribe_prefix': 'test/'
  27. },
  28. 'i2c': {
  29. 'polling_interval_ms': 100
  30. },
  31. 'measurement': {
  32. 'min_voltage': 2.5,
  33. 'max_voltage': 4.2,
  34. 'c_rate': 0.5
  35. }
  36. }
  37. @pytest.fixture
  38. def controller(mock_services, config):
  39. i2c_service, http_service, mqtt_service = mock_services
  40. return MeasurementController(config, i2c_service, http_service, mqtt_service)
  41. @pytest.mark.asyncio
  42. async def test_device_polling(controller, mock_services):
  43. i2c_service, _, _ = mock_services
  44. # Setup mock responses
  45. i2c_service.request_status_list.return_value = [DeviceStatus.EMPTY, DeviceStatus.EMPTY]
  46. # Start polling
  47. await controller.start_polling()
  48. await asyncio.sleep(0.2) # Allow some polling cycles
  49. await controller.stop_polling()
  50. # Verify I2C service was called
  51. assert i2c_service.request_status_list.called
  52. assert i2c_service.request_status_list.call_count >= 1
  53. @pytest.mark.asyncio
  54. async def test_cell_insertion_flow(controller, mock_services):
  55. i2c_service, http_service, mqtt_service = mock_services
  56. # Setup mock responses
  57. http_service.fetch_cell_info.return_value = {
  58. 'cell_type': {
  59. 'min_voltage': 2.5,
  60. 'max_voltage': 4.2,
  61. 'capacity': 3000,
  62. 'name': 'Test Cell'
  63. }
  64. }
  65. # Simulate cell insertion
  66. insertion_info = InsertedCell(device_id=1, slot_id=0, cell_id=123)
  67. controller._update_inserted_cell(insertion_info)
  68. # Verify HTTP service was called to fetch cell info
  69. http_service.fetch_cell_info.assert_called_once_with(123)
  70. # Verify cell was inserted with correct parameters
  71. device = controller.devices[1]
  72. cell = device.slots[0].get_cell()
  73. assert cell is not None
  74. assert cell.id == 123
  75. assert cell.nom_capacity == 3000
  76. @pytest.mark.asyncio
  77. async def test_measurement_cycle(controller, mock_services):
  78. i2c_service, _, mqtt_service = mock_services
  79. capacity = 3000
  80. # Setup initial state
  81. device = controller.devices[1]
  82. cell = Cell(123, CellLimits(2.5, 4.2, 1.5), capacity)
  83. device.slots[0].insert_cell(cell)
  84. # Setup mock responses for state transitions
  85. i2c_service.request_status_list.side_effect = [
  86. [DeviceStatus.INSERTED, DeviceStatus.EMPTY], # Initial state
  87. [DeviceStatus.MEASURING, DeviceStatus.EMPTY], # Measuring
  88. [DeviceStatus.MEASURING, DeviceStatus.EMPTY], # Measuring
  89. [DeviceStatus.DONE, DeviceStatus.EMPTY], # Measurement complete
  90. ]
  91. i2c_service.request_measure_values.return_value = MeasureValues(4.2, 3.6, 1.5)
  92. # Start polling
  93. await controller.start_polling()
  94. await asyncio.sleep(1) # Allow state transitions
  95. await controller.stop_polling()
  96. # Verify measurement cycle
  97. assert i2c_service.send_cell_limits.called
  98. assert i2c_service.request_measure_values.called
  99. assert mqtt_service.cell_finished.called
  100. # Verify health calculation is called
  101. device_id, slot_id, cell_id, health, status = mqtt_service.cell_finished.call_args[0]
  102. # Verify health calculation and other parameters
  103. assert device_id == 1
  104. assert slot_id == 0
  105. assert cell_id == 123
  106. assert health != -1.0 # Verify that health was actually calculated
  107. assert status == DeviceStatus.DONE
  108. @pytest.mark.asyncio
  109. async def test_error_handling(controller, mock_services):
  110. i2c_service, _, mqtt_service = mock_services
  111. # Setup initial state
  112. device = controller.devices[1]
  113. cell = Cell(123, CellLimits(2.5, 4.2, 1.5), 3000)
  114. device.slots[0].insert_cell(cell)
  115. # Simulate error condition
  116. i2c_service.request_status_list.return_value = [DeviceStatus.ERROR, DeviceStatus.ERROR]
  117. # Start polling
  118. await controller.start_polling()
  119. await asyncio.sleep(0.5)
  120. await controller.stop_polling()
  121. # Verify error handling
  122. mqtt_service.cell_finished.assert_called_with(1, 0, 123, 0.0, DeviceStatus.ERROR)
  123. assert device.slots[0].get_cell() is None # Cell should be removed after error
  124. @pytest.mark.asyncio
  125. async def test_invalid_status_list(controller, mock_services, caplog):
  126. i2c_service, _, _ = mock_services
  127. caplog.set_level(logging.ERROR)
  128. # Setup mock to return invalid status list
  129. i2c_service.request_status_list.return_value = [DeviceStatus.EMPTY] # Too short
  130. # Start polling
  131. await controller.start_polling()
  132. await asyncio.sleep(0.2) # Allow some polling cycles
  133. await controller.stop_polling()
  134. # Verify error was logged
  135. assert "Invalid status list length" in caplog.text
  136. # Add pytest.ini file to handle asyncio