integration_test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import signal
  2. import sys
  3. import asyncio
  4. import logging
  5. from robot_control.src.robot.controller import RobotController, Cell, CellStatus
  6. from robot_control.src.utils.config import ConfigParser
  7. from robot_control.src.vision.datamatrix import DataMatrixReader
  8. from robot_control.src.api.i2c_handler import I2C, MockI2C
  9. from robot_control.src.vendor.mcp3428 import MCP3428
  10. from robot_control.src.robot.pump_controller import PumpController
  11. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  12. logging.basicConfig(
  13. level=logging.INFO,
  14. format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
  15. )
  16. logger = logging.getLogger(__name__)
  17. logger.setLevel(logging.INFO)
  18. """
  19. This is a test script for the loader system.
  20. It initializes the robot controller, vision system, and I2C handler,
  21. """
  22. async def wait_for_enter():
  23. # Use asyncio.Event to coordinate between input and async code
  24. event = asyncio.Event()
  25. def input_callback():
  26. input() # Wait for Enter
  27. event.set()
  28. # Run input in a separate thread to not block the event loop
  29. loop = asyncio.get_running_loop()
  30. await loop.run_in_executor(None, input_callback)
  31. await event.wait()
  32. class LoaderSystem:
  33. def __init__(self):
  34. self.config = ConfigParser().config
  35. gpio_config = self.config.gpio
  36. if gpio_config.debug:
  37. self.gpio = MockGPIO()
  38. else:
  39. self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
  40. self.controller = RobotController(self.config, self.gpio)
  41. # self.vision = DataMatrixReader(self.config)
  42. logger.info("Initializing LoaderSystem")
  43. # self.vision.initialize()
  44. # Test stuff
  45. self.test_dropoff = self.config.dropoff_grades[0]
  46. self.test_slot = self.config.measurement_devices[0].slots[3]
  47. # Use mock I2C device if debug is enabled
  48. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  49. self.i2c = I2C(i2c_device_class)
  50. self.i2c.initialize()
  51. logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  52. self.pump_controller = PumpController(self.config, self.gpio)
  53. async def run(self):
  54. await self.controller.connect()
  55. await asyncio.gather(
  56. self._loader_loop(),
  57. self._poll_i2c_channels()
  58. )
  59. async def _poll_i2c_channels(self):
  60. while True:
  61. try:
  62. readings = await self.i2c.read_channels([1, 3, 4])
  63. for channel, value in readings.items():
  64. logger.debug(f"Channel {channel} reading: {value}")
  65. if channel == 3: # Pressure reading
  66. self.pump_controller.handle_tank_reading(value)
  67. if channel == 4:
  68. state = self.pump_controller.check_endeffector_state(value)
  69. self.controller.set_suction_state(state)
  70. except Exception as e:
  71. logger.error(f"Error polling I2C channels: {str(e)}")
  72. await asyncio.sleep(1) # Poll every second
  73. async def _loader_loop(self):
  74. while True:
  75. await wait_for_enter()
  76. logger.info("Starting movement sequence...")
  77. await self.controller.pick_cell_from_slot(self.test_slot)
  78. await self.controller.dropoff_cell(self.test_dropoff)
  79. await self.controller.pick_cell_from_feeder()
  80. cell = Cell(id="test_cell", status=CellStatus.WAITING)
  81. await self.controller.insert_cell_to_slot(cell, self.test_slot)
  82. logger.info("\nPress Enter to repeat sequence (or Ctrl+C to exit)...")
  83. async def cleanup(self):
  84. logger.info("Cleaning up resources...")
  85. await self.controller.cleanup()
  86. self.gpio.cleanup()
  87. if __name__ == "__main__":
  88. loader_system = LoaderSystem()
  89. async def shutdown():
  90. logger.info("Shutting down...")
  91. await loader_system.cleanup()
  92. sys.exit(0)
  93. def handle_signal(sig, frame):
  94. asyncio.get_event_loop().create_task(shutdown())
  95. signal.signal(signal.SIGINT, handle_signal)
  96. signal.signal(signal.SIGTERM, handle_signal)
  97. asyncio.run(loader_system.run())