main.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import asyncio
  2. import logging
  3. from robot_control.src.robot.controller import RobotController
  4. from robot_control.src.utils.config import ConfigParser
  5. from robot_control.src.vision.datamatrix import DataMatrixReader
  6. from robot_control.src.api.i2c_handler import I2C, MockI2C
  7. from robot_control.src.vendor.mcp3428 import MCP3428
  8. from robot_control.src.robot.pump_controller import PumpController
  9. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  10. from robot_control.src.utils.logging import setup_logging
  11. class LoaderSystem:
  12. def __init__(self):
  13. self.config = ConfigParser().config
  14. setup_logging(self.config)
  15. gpio_config = self.config.gpio
  16. if gpio_config.debug:
  17. self.gpio = MockGPIO()
  18. else:
  19. self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
  20. self.logger = logging.getLogger(__name__)
  21. self.vision = DataMatrixReader(self.config.vision)
  22. self.logger.info("Initializing LoaderSystem")
  23. self.vision.initialize()
  24. # Use mock I2C device if debug is enabled
  25. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  26. self.i2c = I2C(i2c_device_class)
  27. self.i2c.initialize()
  28. self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  29. self.pump_controller = PumpController(self.config, self.gpio)
  30. # Pass all hardware interfaces to the controller
  31. self.controller = RobotController(
  32. self.config,
  33. self.gpio,
  34. self.i2c,
  35. self.vision,
  36. self.pump_controller
  37. )
  38. async def run(self):
  39. await self.controller.connect()
  40. try:
  41. await asyncio.gather(
  42. self._loader_loop(),
  43. self._poll_i2c_channels()
  44. )
  45. finally:
  46. self.cleanup()
  47. self.logger.info("Cleaning up resources...")
  48. async def _poll_i2c_channels(self):
  49. while True:
  50. try:
  51. readings = await self.i2c.read_channels([1, 3, 4])
  52. for channel, value in readings.items():
  53. self.logger.debug(f"Channel {channel} reading: {value}")
  54. if channel == 3: # Pressure reading
  55. self.pump_controller.handle_tank_reading(value)
  56. if channel == 4:
  57. state = self.pump_controller.check_endeffector_state(value)
  58. self.controller.set_suction_state(state)
  59. except Exception as e:
  60. self.logger.error(f"Error polling I2C channels: {str(e)}")
  61. await asyncio.sleep(1) # Poll every second
  62. async def _loader_loop(self):
  63. """
  64. Main loop for the loader system.
  65. Orchestrates cell preparation, slot filling, and measurement processing.
  66. """
  67. while True:
  68. await asyncio.sleep(0.1) # avoid busy loop
  69. while True:
  70. # Prepare a cell in the feeder (returns True if a cell is ready)
  71. if not await self.controller.prepare_feeder_cell():
  72. break
  73. # Fill the next free slot (returns True if a cell was placed)
  74. if not await self.controller.fill_next_free_slot():
  75. break
  76. # Check for completed measurements and sort cell
  77. await self.controller.process_finished_measurement()
  78. def cleanup(self):
  79. self.gpio.cleanup() # Ensure PumpController cleans up gpio
  80. if __name__ == "__main__":
  81. loader_system = LoaderSystem()
  82. asyncio.run(loader_system.run())