main.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import asyncio
  2. import logging
  3. from robot_control.src.robot.controller import RobotController
  4. from robot_control.src.utils.config import ConfigParser
  5. from robot_control.src.vision.datamatrix import DataMatrixReader
  6. from robot_control.src.api.i2c_handler import I2C, MockI2C
  7. from robot_control.src.vendor.mcp3428 import MCP3428
  8. from robot_control.src.robot.pump_controller import PumpController
  9. from robot_control.src.api.gpio import PiGPIO, MockGPIO
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
  13. )
  14. class LoaderSystem:
  15. def __init__(self):
  16. self.config = ConfigParser().config
  17. gpio_config = self.config.gpio
  18. if gpio_config.debug:
  19. self.gpio = MockGPIO()
  20. else:
  21. self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
  22. self.logger = logging.getLogger(__name__)
  23. self.controller = RobotController(self.config, self.gpio)
  24. self.vision = DataMatrixReader(self.config.vision)
  25. self.logger.info("Initializing LoaderSystem")
  26. self.vision.initialize()
  27. # Use mock I2C device if debug is enabled
  28. i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
  29. self.i2c = I2C(i2c_device_class)
  30. self.i2c.initialize()
  31. self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
  32. self.pump_controller = PumpController(self.config, self.gpio)
  33. self.next_cell_id = ""
  34. async def run(self):
  35. await self.controller.connect()
  36. try:
  37. await asyncio.gather(
  38. self._loader_loop(),
  39. self._poll_i2c_channels()
  40. )
  41. finally:
  42. self.cleanup()
  43. self.logger.info("Cleaning up resources...")
  44. async def _poll_i2c_channels(self):
  45. while True:
  46. try:
  47. readings = await self.i2c.read_channels([1, 3, 4])
  48. for channel, value in readings.items():
  49. self.logger.debug(f"Channel {channel} reading: {value}")
  50. if channel == 3: # Pressure reading
  51. self.pump_controller.handle_tank_reading(value)
  52. if channel == 4:
  53. state = self.pump_controller.check_endeffector_state(value)
  54. self.controller.set_suction_state(state)
  55. except Exception as e:
  56. self.logger.error(f"Error polling I2C channels: {str(e)}")
  57. await asyncio.sleep(1) # Poll every second
  58. def check_cell_voltage(self, voltage, cell_id_str = ""):
  59. if voltage < abs(self.config.feeder.min_voltage):
  60. self.logger.info(f"Cell {cell_id_str} voltage too low, discarding cell")
  61. return False
  62. if voltage < 0:
  63. self.logger.info(f"Cell {cell_id_str} has wrong polarity, discarding cell")
  64. return False
  65. self.logger.info(f"Cell {cell_id_str} voltage({voltage}) is good")
  66. return True
  67. async def _prepare_feeder_cell(self) -> None:
  68. """
  69. Handles the process of preparing a cell in the feeder. Loops until a suitable cell is found or no cell is detected.
  70. Returns the cell_id_str if a cell is prepared, otherwise an empty string.
  71. """
  72. io_conf = self.config.gpio
  73. while not self.next_cell_id:
  74. self.next_cell_id = self.vision.read_datamatrix()
  75. if not self.next_cell_id:
  76. self.logger.debug("No cell detected")
  77. return # No cell detected
  78. self.gpio.set_pin(io_conf.probe_pin, 1)
  79. await asyncio.sleep(0.1) # Wait for probe to deploy
  80. cell_v = await self.i2c.read_channel(1) # Measure cell voltage
  81. self.gpio.set_pin(io_conf.probe_pin, 0)
  82. self.logger.debug(f"Cell voltage: {cell_v}")
  83. if self.check_cell_voltage(cell_v, self.next_cell_id):
  84. return # Desired case!
  85. # Discard cell directly from feeder
  86. self.logger.info(f"Cell {self.next_cell_id} voltage({cell_v}) is bad, discarding cell")
  87. self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn
  88. await asyncio.sleep(1) # Wait for cell to be ejected
  89. try:
  90. self.next_cell_id = ""
  91. await self.controller.pick_cell_from_feeder()
  92. await self.controller.dropoff_cell()
  93. except Exception as e:
  94. self.logger.error(f"Failed to process cell {self.next_cell_id}: {str(e)}")
  95. return
  96. async def _loader_loop(self):
  97. """
  98. Main loop for the loader system.
  99. Checks for free slots and tries to fill them with cells.
  100. If no more free slots are available, it checks for completed measurements
  101. and sorts the cells accordingly.
  102. """
  103. while True:
  104. await asyncio.sleep(0.1) # avoid busy loop
  105. # Check for free slots loop
  106. while True:
  107. # Discard cells until acceptable cell is found as next_cell_id
  108. if not self.next_cell_id:
  109. await self._prepare_feeder_cell()
  110. slot = self.controller.get_next_free_slot()
  111. if not slot:
  112. break # No free slots available
  113. if not self.next_cell_id:
  114. break # Continue with process_finished_measurement
  115. # Pick and place new cell
  116. cell_id = int(self.next_cell_id)
  117. self.logger.info(f"Processing cell {cell_id}")
  118. cell = self.controller.add_cell(cell_id)
  119. io_conf = self.config.gpio
  120. try:
  121. self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn
  122. await asyncio.sleep(1) # Wait for cell to be ejected
  123. await self.controller.pick_cell_from_feeder()
  124. self.feeder_prepared = False
  125. await self.controller.insert_cell_to_slot(cell, slot)
  126. except Exception as e:
  127. self.logger.error(f"Failed to process cell {cell_id}: {str(e)}")
  128. break
  129. # Check for completed measurements and sort cell
  130. await self.controller.process_finished_measurement()
  131. def cleanup(self):
  132. self.gpio.cleanup() # Ensure PumpController cleans up gpio
  133. if __name__ == "__main__":
  134. loader_system = LoaderSystem()
  135. asyncio.run(loader_system.run())