mag_distributor.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import logging
  2. import asyncio
  3. from robot_control.src.utils.config import RobotConfig
  4. from robot_control.src.api.gpio import GPIOInterface
  5. from typing import Optional
  6. class MagDistributor:
  7. def __init__(self, config: RobotConfig, gpio: GPIOInterface):
  8. self.config = config
  9. self.gpio = gpio
  10. # Pin assignments from config
  11. gpio_conf = config.gpio
  12. self.linear_dir_pin = gpio_conf.mag_dist_pos_dir_pin
  13. self.linear_step_pin = gpio_conf.mag_dist_pos_step_pin
  14. # self.linear_en_pin = gpio_conf.mag_dist_pos_en_pin
  15. self.rot_dir_pin = gpio_conf.mag_dist_rot_dir_pin
  16. self.rot_step_pin = gpio_conf.mag_dist_rot_step_pin
  17. # self.rot_en_pin = gpio_conf.mag_dist_rot_en_pin
  18. # Endstop pins
  19. self.linear_limit_pin = gpio_conf.mag_dist_pos_limit_pin
  20. self.rot_limit_pin = gpio_conf.mag_dist_rot_limit_pin
  21. # Cell pick sensor pin
  22. self.mag_dist_sensor_pin = gpio_conf.mag_dist_sensor_pin # <-- Add this to your config
  23. # Max travel (mm or steps, as appropriate)
  24. self.full_rot_steps = 3200 # Assuming 3200 steps for a full rotation
  25. self.linear_length_mm = config.mag_distributor.length_mm
  26. self.steps_per_mm = self.full_rot_steps / 40 # Assuming 40mm per rotation
  27. self.max_speed = config.mag_distributor.max_speed # mm/s
  28. self.home_speed = config.mag_distributor.home_speed # mm/s
  29. # Current position tracking (steps or mm/deg)
  30. self.curr_pos_mm = 0
  31. self.curr_rot_deg = 0
  32. # Track current feeding magazine
  33. self.current_magazine = 0
  34. # Track magazines already checked for picking
  35. self.empty_magazines : set[int] = set()
  36. self.logger = logging.getLogger(__name__)
  37. def _speed_to_step_delay(self, speed_mm_s):
  38. """Convert speed in mm/s to step delay in ms for do_step."""
  39. steps_per_s = speed_mm_s * self.steps_per_mm
  40. if steps_per_s == 0:
  41. return 1000 # fallback to slow
  42. step_delay_s = 1.0 / steps_per_s
  43. return int(step_delay_s * 1000) # ms
  44. async def home(self):
  45. """Home both axes using endstops."""
  46. self.logger.info("Homing linear axis...")
  47. max_linear_steps = int(self.linear_length_mm * self.steps_per_mm)
  48. home_step_delay = self._speed_to_step_delay(self.home_speed)
  49. await self._home_axis(self.linear_dir_pin, self.linear_step_pin, self.linear_limit_pin, axis='linear', max_steps=max_linear_steps, step_delay=home_step_delay)
  50. self.curr_pos_mm = 0
  51. self.logger.info("Homing rotational axis...")
  52. await self._home_axis(self.rot_dir_pin, self.rot_step_pin, self.rot_limit_pin, axis='rot', max_steps=self.full_rot_steps, step_delay=home_step_delay)
  53. self.curr_rot_deg = 0
  54. self.logger.info("Mag distributor homed successfully.")
  55. async def _home_axis(self, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=200):
  56. # Move in negative direction until endstop is triggered
  57. for _ in range(max_steps):
  58. if self.gpio.get_pin(endstop_pin):
  59. break
  60. self.gpio.do_step(dir_pin, step_pin, 1, step_delay, direction=True)
  61. await asyncio.sleep(0.001)
  62. else:
  63. raise RuntimeError(f"{axis} axis homing failed")
  64. self.logger.info(f"{axis} axis homed.")
  65. def move_mag_distributor_at_pos(self, pos_target, rot_target, step_delay=None):
  66. self.logger.info(f"Moving mag distributor to linear: {pos_target} mm, rot: {rot_target} deg")
  67. linear_steps = int(abs(pos_target - self.curr_pos_mm) * self.steps_per_mm)
  68. rot_steps = int(abs(rot_target - self.curr_rot_deg) / 360 * self.full_rot_steps)
  69. linear_dir = True if pos_target > self.curr_pos_mm else False
  70. rot_dir = True if rot_target > self.curr_rot_deg else False
  71. # Use max speed for normal moves if not specified
  72. if step_delay is None:
  73. step_delay = self._speed_to_step_delay(self.max_speed)
  74. if linear_steps > 0:
  75. self.gpio.do_step(self.linear_dir_pin, self.linear_step_pin, linear_steps, step_delay, linear_dir)
  76. self.curr_pos_mm = pos_target
  77. if rot_steps > 0:
  78. self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir)
  79. self.curr_rot_deg = rot_target
  80. def reset_empty_magazines(self):
  81. """Reset the set of magazines already checked for picking."""
  82. self.empty_magazines.clear()
  83. def mag_to_feeder(self, magazine_id: Optional[int] = None):
  84. """
  85. Move a cell from a magazine to the feeder.
  86. If magazine_id is None, use current_magazine and try next if pick fails.
  87. """
  88. magazines = self.config.feeder.magazines
  89. if magazine_id is not None:
  90. mags_to_try = [magazine_id]
  91. else:
  92. # Only try magazines that have not been checked yet
  93. mags_to_try = [i for i in (list(range(self.current_magazine, len(magazines))) + list(range(0, self.current_magazine)))
  94. if i not in self.empty_magazines]
  95. # Check if cell can be picked from magazines, if not add to empty magazines and continue
  96. for mag_id in mags_to_try:
  97. x_pos = magazines[mag_id].x_pos
  98. rot_deg = magazines[mag_id].rot_deg
  99. self.move_mag_distributor_at_pos(x_pos, rot_deg)
  100. # Check cell pick sensor
  101. if self.gpio.get_pin(self.mag_dist_sensor_pin):
  102. pos = self.config.feeder.mag_pos
  103. x_pos = magazines[mag_id].x_pos
  104. rot_deg = magazines[mag_id].rot_deg
  105. self.move_mag_distributor_at_pos(pos[0], pos[1])
  106. self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.")
  107. self.current_magazine = mag_id # update current
  108. return
  109. else:
  110. self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.")
  111. self.empty_magazines.add(mag_id)
  112. continue
  113. self.logger.warning("No more available magazines to pick from. All attempts failed.")
  114. def defeeder_to_mag(self, magazine_id: int):
  115. """
  116. Move a cell from the defeeder to a specific magazine.
  117. Includes checks for valid magazine_id and sensor confirmation.
  118. """
  119. # Check magazine_id validity
  120. magazines = self.config.defeeder.magazines
  121. if magazine_id < 0 or magazine_id >= len(magazines):
  122. self.logger.error(f"Invalid magazine_id: {magazine_id}")
  123. return
  124. # Move to defeeder position
  125. pos = self.config.defeeder.mag_pos
  126. self.move_mag_distributor_at_pos(pos[0], pos[1])
  127. # Optionally check for cell presence at defeeder (if sensor available)
  128. if not self.gpio.get_pin(self.mag_dist_sensor_pin):
  129. self.logger.warning("No cell detected at defeeder position.")
  130. return
  131. # Move to target magazine position
  132. self.move_mag_distributor_at_pos(magazines[magazine_id].x_pos, magazines[magazine_id].rot_deg)
  133. # Optionally check for successful placement (if sensor logic applies)
  134. self.logger.info(f"Cell collected from defeeder and moved to magazine {magazine_id}.")