mag_distributor.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import logging
  2. import asyncio
  3. from robot_control.src.utils.config import RobotConfig
  4. from robot_control.src.api.gpio import GPIOInterface
  5. from typing import Optional
  6. class MagDistributor:
  7. def __init__(self, config: RobotConfig, gpio: GPIOInterface):
  8. self.config = config
  9. self.gpio = gpio
  10. # Pin assignments from config
  11. gpio_conf = config.gpio
  12. self.linear_dir_pin = gpio_conf.mag_dist_pos_dir_pin
  13. self.linear_step_pin = gpio_conf.mag_dist_pos_step_pin
  14. # self.linear_en_pin = gpio_conf.mag_dist_pos_en_pin
  15. self.rot_dir_pin = gpio_conf.mag_dist_rot_dir_pin
  16. self.rot_step_pin = gpio_conf.mag_dist_rot_step_pin
  17. # self.rot_en_pin = gpio_conf.mag_dist_rot_en_pin
  18. # Endstop pins
  19. self.linear_limit_pin = gpio_conf.mag_dist_pos_limit_pin
  20. self.rot_limit_pin = gpio_conf.mag_dist_rot_limit_pin
  21. # Cell pick sensor pin
  22. self.mag_dist_sensor_pin = gpio_conf.mag_dist_sensor_pin # <-- Add this to your config
  23. # Max travel (mm or steps, as appropriate)
  24. self.full_rot_steps = 3200 # Assuming 3200 steps for a full rotation
  25. self.linear_length_mm = config.mag_distributor.length_mm
  26. self.steps_per_mm = self.full_rot_steps / 40 # Assuming 40mm per rotation
  27. self.max_speed_mms = config.mag_distributor.max_speed_mms # mm/s
  28. self.home_speed_mms = config.mag_distributor.home_speed_mms # mm/s
  29. self.debug = config.mag_distributor.debug
  30. # Current position tracking (steps or mm/deg)
  31. self.curr_pos_mm = 0
  32. self.curr_rot_deg = 0
  33. # Track current feeding magazine
  34. self.current_magazine = 0
  35. # Track magazines already checked for picking
  36. self.empty_magazines : set[int] = set()
  37. self.logger = logging.getLogger(__name__)
  38. def _speed_to_step_delay(self, speed_mm_s):
  39. """Convert speed in mm/s to step delay in ms for do_step."""
  40. steps_per_s = speed_mm_s * self.steps_per_mm
  41. if steps_per_s == 0:
  42. return 1000 # fallback to slow
  43. step_delay_s = 1.0 / steps_per_s
  44. return int(step_delay_s * 1000) # ms
  45. async def home(self):
  46. """Home both axes using endstops."""
  47. self.logger.info("Homing linear axis...")
  48. max_linear_steps = int(self.linear_length_mm * self.steps_per_mm)
  49. home_step_delay = self._speed_to_step_delay(self.home_speed_mms)
  50. await self._home_axis(self.linear_dir_pin, self.linear_step_pin, self.linear_limit_pin, axis='linear', max_steps=max_linear_steps, step_delay=home_step_delay)
  51. self.curr_pos_mm = 0
  52. self.logger.info("Homing rotational axis...")
  53. await self._home_axis(self.rot_dir_pin, self.rot_step_pin, self.rot_limit_pin, axis='rot', max_steps=self.full_rot_steps, step_delay=home_step_delay)
  54. self.curr_rot_deg = 0
  55. self.logger.info("Mag distributor homed successfully.")
  56. async def _home_axis(self, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=200):
  57. if self.debug:
  58. return
  59. # Move in negative direction until endstop is triggered
  60. for _ in range(max_steps):
  61. if self.gpio.get_pin(endstop_pin):
  62. break
  63. self.gpio.do_step(dir_pin, step_pin, 1, step_delay, direction=True)
  64. await asyncio.sleep(0.001)
  65. else:
  66. raise RuntimeError(f"{axis} axis homing failed")
  67. self.logger.info(f"{axis} axis homed.")
  68. def move_mag_distributor_at_pos(self, pos_target, rot_target, step_delay=None):
  69. if self.debug:
  70. return
  71. self.logger.info(f"Moving mag distributor to linear: {pos_target} mm, rot: {rot_target} deg")
  72. linear_steps = int(abs(pos_target - self.curr_pos_mm) * self.steps_per_mm)
  73. rot_steps = int(abs(rot_target - self.curr_rot_deg) / 360 * self.full_rot_steps)
  74. linear_dir = True if pos_target > self.curr_pos_mm else False
  75. rot_dir = True if rot_target > self.curr_rot_deg else False
  76. # Use max speed for normal moves if not specified
  77. if step_delay is None:
  78. step_delay = self._speed_to_step_delay(self.max_speed_mms)
  79. if linear_steps > 0:
  80. self.gpio.do_step(self.linear_dir_pin, self.linear_step_pin, linear_steps, step_delay, linear_dir)
  81. self.curr_pos_mm = pos_target
  82. if rot_steps > 0:
  83. self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir)
  84. self.curr_rot_deg = rot_target
  85. def reset_empty_magazines(self):
  86. """Reset the set of magazines already checked for picking."""
  87. self.empty_magazines.clear()
  88. def mag_to_feeder(self, magazine_id: Optional[int] = None):
  89. """
  90. Move a cell from a magazine to the feeder.
  91. If magazine_id is None, use current_magazine and try next if pick fails.
  92. """
  93. magazines = self.config.feeder_magazines
  94. if magazine_id is not None:
  95. mags_to_try = [magazine_id]
  96. else:
  97. # Only try magazines that have not been checked yet
  98. mags_to_try = [i for i in (list(range(self.current_magazine, len(magazines))) + list(range(0, self.current_magazine)))
  99. if i not in self.empty_magazines]
  100. # Check if cell can be picked from magazines, if not add to empty magazines and continue
  101. for mag_id in mags_to_try:
  102. pos = magazines[mag_id].mag_pos
  103. self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
  104. # Check cell pick sensor
  105. if self.mag_dist_sensor_pin and self.gpio.get_pin(self.mag_dist_sensor_pin):
  106. pos = self.config.feeder.mag_pos
  107. self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
  108. self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.")
  109. self.current_magazine = mag_id # update current
  110. return
  111. else:
  112. self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.")
  113. self.empty_magazines.add(mag_id)
  114. continue
  115. self.logger.warning("No more available magazines to pick from. All attempts failed.")
  116. def defeeder_to_mag(self, magazine_id: int):
  117. """
  118. Move a cell from the defeeder to a specific magazine.
  119. Includes checks for valid magazine_id and sensor confirmation.
  120. """
  121. # Check magazine_id validity
  122. magazines = self.config.defeeder_magazines
  123. if magazine_id < 0 or magazine_id >= len(magazines):
  124. self.logger.error(f"Invalid magazine_id: {magazine_id}")
  125. return
  126. # Move to defeeder position
  127. pos = self.config.defeeder.mag_pos
  128. self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
  129. # Optionally check for cell presence at defeeder (if sensor available)
  130. if self.mag_dist_sensor_pin and not self.gpio.get_pin(self.mag_dist_sensor_pin):
  131. self.logger.warning("No cell detected at defeeder position.")
  132. return
  133. # Move to target magazine position
  134. pos = magazines[magazine_id].mag_pos
  135. self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
  136. # Optionally check for successful placement (if sensor logic applies)
  137. self.logger.info(f"Cell collected from defeeder and moved to magazine {magazine_id}.")