integration_test.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import asyncio
  2. import logging
  3. from robot_control.src.robot.controller import RobotController, Cell, CellStatus
  4. from robot_control.src.utils.config import ConfigParser
  5. from robot_control.src.vision.datamatrix import DataMatrixReader
  6. from robot_control.src.api.i2c_handler import I2C, MockI2C
  7. from robot_control.src.vendor.mcp3428 import MCP3428
  8. from robot_control.src.robot.pump_controller import PumpController
  9. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
  13. )
  14. logger = logging.getLogger(__name__)
  15. """
  16. This is a test script for the loader system.
  17. It initializes the robot controller, vision system, and I2C handler,
  18. """
  19. class LoaderSystem:
  20. def __init__(self):
  21. self.config = ConfigParser().config
  22. gpio_config = self.config.gpio
  23. if gpio_config.debug:
  24. self.gpio = MockGPIO()
  25. else:
  26. self.gpio = PiGPIO([gpio_config.pump_pin, gpio_config.valve_pin], [])
  27. self.controller = RobotController(self.config, self.gpio)
  28. # self.vision = DataMatrixReader(self.config)
  29. logger.info("Initializing LoaderSystem")
  30. # self.vision.initialize()
  31. # Test stuff
  32. self.test_dropoff = self.config.dropoff_grades[0]
  33. self.test_slot = self.config.measurement_devices[0].slots[3]
  34. # Use mock I2C device if debug is enabled
  35. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  36. self.i2c = I2C(i2c_device_class)
  37. self.i2c.initialize()
  38. logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  39. self.pump_controller = PumpController(self.config, self.gpio)
  40. async def run(self):
  41. await self.controller.connect()
  42. await asyncio.gather(
  43. self._loader_loop(),
  44. self._poll_i2c_channels()
  45. )
  46. async def _poll_i2c_channels(self):
  47. while True:
  48. try:
  49. readings = await self.i2c.read_channels([1, 3, 4])
  50. for channel, value in readings.items():
  51. logger.debug(f"Channel {channel} reading: {value}")
  52. # if channel == 3: # Pressure reading
  53. # self.pump_controller.handle_tank_reading(value)
  54. if channel == 4:
  55. state = self.pump_controller.check_endeffector_state(value)
  56. self.controller.set_suction_state(state)
  57. except Exception as e:
  58. logger.error(f"Error polling I2C channels: {str(e)}")
  59. await asyncio.sleep(1) # Poll every second
  60. async def _loader_loop(self):
  61. await self.controller.pick_cell_from_slot(self.test_slot)
  62. await self.controller.dropoff_cell(self.test_dropoff)
  63. await self.controller.pick_cell_from_feeder()
  64. cell = Cell(id="test_cell", status=CellStatus.WAITING)
  65. await self.controller.insert_cell_to_slot(cell, self.test_slot)
  66. def __del__(self):
  67. self.pump_controller.cleanup() # Ensure PumpController cleans up gpio
  68. if __name__ == "__main__":
  69. loader_system = LoaderSystem()
  70. asyncio.run(loader_system.run())