main.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import asyncio
  2. import logging
  3. from robot_control.src.robot.controller import RobotController
  4. from robot_control.src.utils.config import ConfigParser
  5. from robot_control.src.vision.datamatrix import DataMatrixReader
  6. from robot_control.src.api.i2c_handler import I2C, MockI2C
  7. from robot_control.src.vendor.mcp3428 import MCP3428
  8. from robot_control.src.robot.pump_controller import PumpController
  9. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
  13. )
  14. class LoaderSystem:
  15. def __init__(self):
  16. self.config = ConfigParser().config
  17. gpio_config = self.config.gpio
  18. if gpio_config.debug:
  19. self.gpio = MockGPIO()
  20. else:
  21. self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
  22. self.logger = logging.getLogger(__name__)
  23. self.controller = RobotController(self.config, self.gpio)
  24. self.vision = DataMatrixReader(self.config.vision)
  25. self.logger.info("Initializing LoaderSystem")
  26. self.vision.initialize()
  27. # Use mock I2C device if debug is enabled
  28. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  29. self.i2c = I2C(i2c_device_class)
  30. self.i2c.initialize()
  31. self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  32. self.pump_controller = PumpController(self.config, self.gpio)
  33. async def run(self):
  34. await self.controller.connect()
  35. try:
  36. await asyncio.gather(
  37. self._loader_loop(),
  38. self._poll_i2c_channels()
  39. )
  40. finally:
  41. self.cleanup()
  42. self.logger.info("Cleaning up resources...")
  43. async def _poll_i2c_channels(self):
  44. while True:
  45. try:
  46. readings = await self.i2c.read_channels([1, 3, 4])
  47. for channel, value in readings.items():
  48. self.logger.debug(f"Channel {channel} reading: {value}")
  49. if channel == 3: # Pressure reading
  50. self.pump_controller.handle_tank_reading(value)
  51. if channel == 4:
  52. state = self.pump_controller.check_endeffector_state(value)
  53. self.controller.set_suction_state(state)
  54. except Exception as e:
  55. self.logger.error(f"Error polling I2C channels: {str(e)}")
  56. await asyncio.sleep(1) # Poll every second
  57. async def _loader_loop(self):
  58. while True:
  59. await asyncio.sleep(0.1) # avoid busy loop
  60. # Check for free slots loop
  61. while True:
  62. slot = self.controller.get_next_free_slot()
  63. if not slot:
  64. break
  65. # Pick and place new cell
  66. cell_id_str = self.vision.read_datamatrix()
  67. if not cell_id_str:
  68. self.logger.debug("No cell detected")
  69. break
  70. cell_id = int(cell_id_str)
  71. self.logger.info(f"Processing cell {cell_id}")
  72. cell = self.controller.add_cell(cell_id)
  73. try:
  74. await self.controller.pick_cell_from_feeder()
  75. await self.controller.insert_cell_to_slot(cell, slot)
  76. except Exception as e:
  77. self.logger.error(f"Failed to process cell {cell_id}: {str(e)}")
  78. break
  79. # Check for completed measurements and sort cell
  80. await self.controller.process_finished_measurement()
  81. def cleanup(self):
  82. self.gpio.cleanup() # Ensure PumpController cleans up gpio
  83. if __name__ == "__main__":
  84. loader_system = LoaderSystem()
  85. asyncio.run(loader_system.run())