|
|
@@ -0,0 +1,127 @@
|
|
|
+import logging
|
|
|
+import asyncio
|
|
|
+from robot_control.src.utils.config import RobotConfig
|
|
|
+from robot_control.src.api.gpio import GPIOInterface
|
|
|
+from typing import Optional
|
|
|
+
|
|
|
+
|
|
|
+class MagDistributor:
|
|
|
+ def __init__(self, config: RobotConfig, gpio: GPIOInterface):
|
|
|
+ self.config = config
|
|
|
+ self.gpio = gpio
|
|
|
+
|
|
|
+ # Pin assignments from config
|
|
|
+ gpio_conf = config.gpio
|
|
|
+ self.linear_dir_pin = gpio_conf.mag_dist_pos_dir_pin
|
|
|
+ self.linear_step_pin = gpio_conf.mag_dist_pos_step_pin
|
|
|
+ # self.linear_en_pin = gpio_conf.mag_dist_pos_en_pin
|
|
|
+ self.rot_dir_pin = gpio_conf.mag_dist_rot_dir_pin
|
|
|
+ self.rot_step_pin = gpio_conf.mag_dist_rot_step_pin
|
|
|
+ # self.rot_en_pin = gpio_conf.mag_dist_rot_en_pin
|
|
|
+
|
|
|
+ # Endstop pins
|
|
|
+ self.linear_limit_pin = gpio_conf.mag_dist_pos_limit_pin
|
|
|
+ self.rot_limit_pin = gpio_conf.mag_dist_rot_limit_pin
|
|
|
+
|
|
|
+ # Cell pick sensor pin
|
|
|
+ self.mag_dist_sensor_pin = gpio_conf.mag_dist_sensor_pin # <-- Add this to your config
|
|
|
+
|
|
|
+ # Max travel (mm or steps, as appropriate)
|
|
|
+ self.full_rot_steps = 3200 # Assuming 3200 steps for a full rotation
|
|
|
+ self.linear_length_mm = config.mag_distributor.length_mm
|
|
|
+ self.steps_per_mm = self.full_rot_steps / 40 # Assuming 40mm per rotation
|
|
|
+ self.max_speed = config.mag_distributor.max_speed # mm/s
|
|
|
+ self.home_speed = config.mag_distributor.home_speed # mm/s
|
|
|
+
|
|
|
+ # Current position tracking (steps or mm/deg)
|
|
|
+ self.curr_pos_mm = 0
|
|
|
+ self.curr_rot_deg = 0
|
|
|
+
|
|
|
+ # Track current feeding magazine
|
|
|
+ self.current_magazine = 0
|
|
|
+
|
|
|
+ self.logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+ def _speed_to_step_delay(self, speed_mm_s):
|
|
|
+ """Convert speed in mm/s to step delay in ms for do_step."""
|
|
|
+ steps_per_s = speed_mm_s * self.steps_per_mm
|
|
|
+ if steps_per_s == 0:
|
|
|
+ return 1000 # fallback to slow
|
|
|
+ step_delay_s = 1.0 / steps_per_s
|
|
|
+ return int(step_delay_s * 1000) # ms
|
|
|
+
|
|
|
+ async def home(self):
|
|
|
+ """Home both axes using endstops."""
|
|
|
+ self.logger.info("Homing linear axis...")
|
|
|
+ max_linear_steps = int(self.linear_length_mm * self.steps_per_mm)
|
|
|
+ home_step_delay = self._speed_to_step_delay(self.home_speed)
|
|
|
+ await self._home_axis(self.linear_dir_pin, self.linear_step_pin, self.linear_limit_pin, axis='linear', max_steps=max_linear_steps, step_delay=home_step_delay)
|
|
|
+ self.curr_pos_mm = 0
|
|
|
+ self.logger.info("Homing rotational axis...")
|
|
|
+ await self._home_axis(self.rot_dir_pin, self.rot_step_pin, self.rot_limit_pin, axis='rot', max_steps=self.full_rot_steps, step_delay=home_step_delay)
|
|
|
+ self.curr_rot_deg = 0
|
|
|
+ self.logger.info("Mag distributor homed successfully.")
|
|
|
+
|
|
|
+ async def _home_axis(self, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=200):
|
|
|
+ # Move in negative direction until endstop is triggered
|
|
|
+ for _ in range(max_steps):
|
|
|
+ if self.gpio.get_pin(endstop_pin):
|
|
|
+ break
|
|
|
+ self.gpio.do_step(dir_pin, step_pin, 1, step_delay, direction=True)
|
|
|
+ await asyncio.sleep(0.001)
|
|
|
+ else:
|
|
|
+ raise RuntimeError(f"{axis} axis homing failed")
|
|
|
+ self.logger.info(f"{axis} axis homed.")
|
|
|
+
|
|
|
+ def move_mag_distributor_at_pos(self, pos_target, rot_target, step_delay=None):
|
|
|
+ self.logger.info(f"Moving mag distributor to linear: {pos_target} mm, rot: {rot_target} deg")
|
|
|
+ linear_steps = int(abs(pos_target - self.curr_pos_mm) * self.steps_per_mm)
|
|
|
+ rot_steps = int(abs(rot_target - self.curr_rot_deg) / 360 * self.full_rot_steps)
|
|
|
+ linear_dir = True if pos_target > self.curr_pos_mm else False
|
|
|
+ rot_dir = True if rot_target > self.curr_rot_deg else False
|
|
|
+ # Use max speed for normal moves if not specified
|
|
|
+ if step_delay is None:
|
|
|
+ step_delay = self._speed_to_step_delay(self.max_speed)
|
|
|
+ if linear_steps > 0:
|
|
|
+ self.gpio.do_step(self.linear_dir_pin, self.linear_step_pin, linear_steps, step_delay, linear_dir)
|
|
|
+ self.curr_pos_mm = pos_target
|
|
|
+ if rot_steps > 0:
|
|
|
+ self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir)
|
|
|
+ self.curr_rot_deg = rot_target
|
|
|
+
|
|
|
+ def mag_to_feeder(self, magazine_id: Optional[int] = None):
|
|
|
+ """
|
|
|
+ Move a cell from a magazine to the feeder.
|
|
|
+ If magazine_id is None, use current_magazine and try next if pick fails.
|
|
|
+ """
|
|
|
+ magazines = self.config.feeder.magazines
|
|
|
+ num_magazines = len(magazines)
|
|
|
+ tried = 0
|
|
|
+ if magazine_id is not None:
|
|
|
+ mags_to_try = [magazine_id]
|
|
|
+ else:
|
|
|
+ mags_to_try = list(range(self.current_magazine, num_magazines)) + list(range(0, self.current_magazine))
|
|
|
+ for mag_id in mags_to_try:
|
|
|
+ pos = magazines[mag_id].pos
|
|
|
+ self.move_mag_distributor_at_pos(pos[0], pos[1])
|
|
|
+ # Check cell pick sensor
|
|
|
+ if self.gpio.get_pin(self.mag_dist_sensor_pin):
|
|
|
+ pos = self.config.feeder.mag_pos
|
|
|
+ self.move_mag_distributor_at_pos(pos[0], pos[1])
|
|
|
+ self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.")
|
|
|
+ self.current_magazine = mag_id # update current
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.")
|
|
|
+ tried += 1
|
|
|
+ continue
|
|
|
+ self.logger.warning("No more available magazines to pick from. All attempts failed.")
|
|
|
+
|
|
|
+ def defeeder_to_mag(self, magazine_id: int):
|
|
|
+ pos = self.config.defeeder.mag_pos
|
|
|
+ self.move_mag_distributor_at_pos(pos[0], pos[1])
|
|
|
+ pos = self.config.defeeder.magazines[magazine_id].pos
|
|
|
+ self.move_mag_distributor_at_pos(pos[0], pos[1])
|
|
|
+ self.logger.info("Cell collected from defeeder.")
|
|
|
+
|
|
|
+
|