integration_test.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import signal
  2. import sys
  3. import asyncio
  4. import logging
  5. from robot_control.src.robot.controller import RobotController, Cell, CellStatus
  6. from robot_control.src.utils.config import ConfigParser, DefeederMagazineConfig
  7. from robot_control.src.utils.logging import setup_logging
  8. from robot_control.src.vision.datamatrix import DataMatrixReader
  9. from robot_control.src.api.i2c_handler import I2C, MockI2C
  10. from robot_control.src.vendor.mcp3428 import MCP3428
  11. from robot_control.src.robot.pump_controller import PumpController
  12. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  13. from robot_control.src.robot.mag_distributor import MagDistributor
  14. """
  15. This is a test script for the loader system.
  16. It initializes the robot controller, vision system, and I2C handler,
  17. """
  18. # Configure root logger
  19. logging.basicConfig(
  20. level=logging.INFO, # show INFO and above (use DEBUG for more details)
  21. format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
  22. )
  23. # Make sure all existing loggers propagate to root
  24. for name, logger_obj in logging.Logger.manager.loggerDict.items():
  25. if isinstance(logger_obj, logging.Logger):
  26. logger_obj.setLevel(logging.INFO)
  27. logger_obj.propagate = True
  28. async def wait_for_enter():
  29. # Use asyncio.Event to coordinate between input and async code
  30. event = asyncio.Event()
  31. def input_callback():
  32. logging.info("Press Enter to continue...")
  33. input() # Wait for Enter
  34. event.set()
  35. # Run input in a separate thread to not block the event loop
  36. loop = asyncio.get_running_loop()
  37. await loop.run_in_executor(None, input_callback)
  38. await event.wait()
  39. class LoaderSystem:
  40. def __init__(self):
  41. self.config = ConfigParser().config
  42. setup_logging(self.config) # Set up logging with config
  43. gpio_config = self.config.gpio
  44. if gpio_config.debug:
  45. self.gpio = MockGPIO()
  46. else:
  47. self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin, gpio_config.mag_dist_pos_dir_pin, gpio_config.mag_dist_pos_step_pin, gpio_config.mag_dist_pos_en_pin])
  48. # Initialize vision system
  49. self.vision = DataMatrixReader(self.config.vision)
  50. # Initialize pump controller
  51. self.pump_controller = PumpController(self.config, self.gpio)
  52. # Initialize magazine distributor
  53. self.mag_distributor = MagDistributor(self.config, self.gpio)
  54. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  55. self.i2c = I2C(i2c_device_class)
  56. self.i2c.initialize()
  57. self.feeder_queue: asyncio.Queue[int] = asyncio.Queue(self.config.feeder.max_num_cells)
  58. self.defeeder_queue: asyncio.Queue[DefeederMagazineConfig] = asyncio.Queue(self.config.defeeder.max_num_cells)
  59. # Initialize robot controller with all required arguments
  60. self.controller = RobotController(
  61. self.config,
  62. self.gpio,
  63. self.i2c,
  64. self.vision,
  65. self.pump_controller,
  66. self.feeder_queue,
  67. self.defeeder_queue
  68. )
  69. self.logger = logging.getLogger(__name__)
  70. self.logger.info("Initializing LoaderSystem")
  71. # Test stuff
  72. self.test_drop_slot = self.config.measurement_devices[0].slots[2]
  73. self.test_pickup_slot = self.config.measurement_devices[0].slots[3]
  74. # Use mock I2C device if debug is enabled
  75. self.i2c.initialize()
  76. self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  77. self.pump_controller = PumpController(self.config, self.gpio)
  78. async def run(self):
  79. await self.controller.connect()
  80. await asyncio.gather(
  81. self._loader_loop(),
  82. self._poll_i2c_channels()
  83. )
  84. async def _poll_i2c_channels(self):
  85. while True:
  86. try:
  87. readings = await self.i2c.read_channels([1, 3, 4])
  88. for channel, value in readings.items():
  89. self.logger.debug(f"Channel {channel} reading: {value}")
  90. if channel == 3: # Pressure reading
  91. self.pump_controller.handle_tank_reading(value)
  92. if channel == 4:
  93. state = self.pump_controller.check_endeffector_state(value)
  94. self.controller.set_suction_state(state)
  95. except Exception as e:
  96. self.logger.error(f"Error polling I2C channels: {str(e)}")
  97. await asyncio.sleep(1) # Poll every second
  98. async def _loader_loop(self):
  99. while True:
  100. await self.mag_distributor.home()
  101. self.mag_distributor.mag_to_feeder()
  102. await self.feeder_queue.put(1)
  103. await self.controller.prepare_feeder_cell()
  104. # await self.controller.pick_c
  105. # ell_from_feeder()
  106. # cell = Cell(id=1, status=CellStatus.WAITING)
  107. # await self.controller.insert_cell_to_slot(cell, self.test_drop_slot)
  108. # await self.controller.pick_cell_from_slot(self.test_pickup_slot)
  109. # await self.controller.dropoff_cell()
  110. # self.mag_distributor.defeeder_to_mag(self.config.defeeder_magazines[0])
  111. # await wait_for_enter()
  112. # # Feeding with MagDistributor
  113. # ###########################
  114. # self.logger.info("Homing the Magazin Distributor Axis...")
  115. # await self.mag_distributor.home()
  116. # self.logger.info("Picking up a cell from magazine and placing it into feeder using MagDistributor...")
  117. # self.mag_distributor.mag_to_feeder()
  118. # await self.feeder_queue.put(1)
  119. # self.logger.info("Done.")
  120. # await wait_for_enter()
  121. # # Prepare Feeder Cell
  122. # ##############################
  123. # self.logger.info("Preparing feeder cell...")
  124. # await self.controller.prepare_feeder_cell()
  125. # self.logger.info("Done.")
  126. # await wait_for_enter()
  127. # # Feeder -> Slot
  128. # ##############################
  129. # self.logger.info("Picking up a cell from feeder and placing it into slot...")
  130. # await self.controller.pick_cell_from_feeder()
  131. # cell = Cell(id=1, status=CellStatus.WAITING)
  132. # await self.controller.insert_cell_to_slot(cell, self.test_drop_slot)
  133. # await wait_for_enter()
  134. # # Slot -> Defeeder
  135. # ##############################
  136. # self.logger.info("Picking up a cell from slot and placing it into defeeder...")
  137. # await self.controller.pick_cell_from_slot(self.test_pickup_slot)
  138. # await self.controller.dropoff_cell()
  139. # self.logger.info("Done.")
  140. # await wait_for_enter()
  141. # # Defeeding with MagDistributor
  142. # ###########################
  143. # self.logger.info("Defeeding a cell from feeder to magazine using MagDistributor...")
  144. # self.mag_distributor.defeeder_to_mag(self.config.defeeder_magazines[0])
  145. # self.logger.info("Done.")
  146. # self.logger.info("\nPress Enter to repeat sequence (or Ctrl+C to exit)...")
  147. async def cleanup(self):
  148. self.logger.info("Cleaning up resources...")
  149. await self.controller.cleanup()
  150. self.gpio.cleanup()
  151. if __name__ == "__main__":
  152. loader_system = LoaderSystem()
  153. async def shutdown():
  154. loader_system.logger.info("Shutting down...")
  155. await loader_system.cleanup()
  156. sys.exit(0)
  157. def handle_signal(sig, frame):
  158. asyncio.get_event_loop().create_task(shutdown())
  159. signal.signal(signal.SIGINT, handle_signal)
  160. signal.signal(signal.SIGTERM, handle_signal)
  161. asyncio.run(loader_system.run())