main.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import asyncio
  2. from robot_control.src.utils.logging import LoggerSingleton
  3. from robot_control.src.robot.controller import RobotController
  4. from robot_control.src.utils.config import ConfigParser
  5. from robot_control.src.vision.datamatrix import DataMatrixReader
  6. from robot_control.src.api.i2c_handler import I2C
  7. from robot_control.src.vendor.mcp3428 import MCP3428
  8. from robot_control.src.robot.pump_controller import PumpController
  9. from robot_control.src.api.gpio import PiGPIO
  10. class LoaderSystem:
  11. def __init__(self):
  12. pump_pin = self.config.get_gpio_config().get("pump_pin")
  13. valve_pin = self.config.get_gpio_config().get("valve_pin")
  14. self.gpio = PiGPIO([pump_pin, valve_pin], [])
  15. self.config = ConfigParser()
  16. self.logger = LoggerSingleton.get_logger(self.config)
  17. self.controller = RobotController(self.config, self.gpio)
  18. cam = self.config.get_vision_config().get("camera_id")
  19. self.vision = DataMatrixReader(cam)
  20. self.logger.info("Initializing LoaderSystem")
  21. self.vision.initialize()
  22. self.i2c = I2C(MCP3428)
  23. self.i2c.initialize()
  24. self.logger.info("I2C initialized for MCP3428")
  25. self.pump_controller = PumpController(self.config, self.gpio, self.logger)
  26. async def run(self):
  27. await self.controller.connect()
  28. await asyncio.gather(
  29. self._loader_loop(),
  30. self._poll_i2c_channels()
  31. )
  32. async def _poll_i2c_channels(self):
  33. while True:
  34. try:
  35. readings = await self.i2c.read_channels([1, 3, 4])
  36. for channel, value in readings.items():
  37. self.logger.info(f"Channel {channel} reading: {value}")
  38. if channel == 3: # Pressure reading
  39. self.pump_controller.handle_tank_reading(value)
  40. if channel == 4:
  41. state = self.pump_controller.check_endeffector_state(value)
  42. self.controller.set_suction_state(state)
  43. # Perform a generic task based on the reading
  44. if value > 0.5: # Example condition
  45. self.logger.warning(f"Value {value} on channel {channel} exceeds threshold")
  46. except Exception as e:
  47. self.logger.error(f"Error polling I2C channels: {str(e)}")
  48. await asyncio.sleep(1) # Poll every second
  49. async def _loader_loop(self):
  50. while True:
  51. await asyncio.sleep(0.1) # avoid busy loop
  52. # Check for free slots loop
  53. while True:
  54. slot = self.controller.get_next_free_slot()
  55. if not slot:
  56. break
  57. # Pick and place new cell
  58. cell_id = self.vision.read_datamatrix()
  59. if not cell_id:
  60. self.logger.debug("No cell detected")
  61. break
  62. self.logger.info(f"Processing cell {cell_id}")
  63. cell = self.controller.add_cell(cell_id)
  64. try:
  65. await self.controller.pick_cell_from_feeder()
  66. await self.controller.insert_cell_to_slot(cell, slot)
  67. except Exception as e:
  68. self.logger.error(f"Failed to process cell {cell_id}: {str(e)}")
  69. break
  70. # Check for completed measurements and sort cell
  71. await self.controller.process_finished_measurement()
  72. def __del__(self):
  73. self.pump_controller.cleanup() # Ensure PumpController cleans up gpio
  74. if __name__ == "__main__":
  75. loader_system = LoaderSystem()
  76. asyncio.run(loader_system.run())