import logging import asyncio from robot_control.src.utils.config import RobotConfig from robot_control.src.api.gpio import GPIOInterface from typing import Optional class MagDistributor: def __init__(self, config: RobotConfig, gpio: GPIOInterface): self.config = config self.gpio = gpio # Pin assignments from config gpio_conf = config.gpio self.linear_dir_pin = gpio_conf.mag_dist_pos_dir_pin self.linear_step_pin = gpio_conf.mag_dist_pos_step_pin # self.linear_en_pin = gpio_conf.mag_dist_pos_en_pin self.rot_dir_pin = gpio_conf.mag_dist_rot_dir_pin self.rot_step_pin = gpio_conf.mag_dist_rot_step_pin # self.rot_en_pin = gpio_conf.mag_dist_rot_en_pin # Endstop pins self.linear_limit_pin = gpio_conf.mag_dist_pos_limit_pin self.rot_limit_pin = gpio_conf.mag_dist_rot_limit_pin # Cell pick sensor pin self.mag_dist_sensor_pin = gpio_conf.mag_dist_sensor_pin # <-- Add this to your config # Max travel (mm or steps, as appropriate) self.full_rot_steps = 3200 # Assuming 3200 steps for a full rotation self.linear_length_mm = config.mag_distributor.length_mm self.steps_per_mm = self.full_rot_steps / 40 # Assuming 40mm per rotation self.max_speed_mms = config.mag_distributor.max_speed_mms # mm/s self.home_speed_mms = config.mag_distributor.home_speed_mms # mm/s self.debug = config.mag_distributor.debug # Current position tracking (steps or mm/deg) self.curr_pos_mm = 0 self.curr_rot_deg = 0 # Track current feeding magazine self.current_magazine = 0 # Track magazines already checked for picking self.empty_magazines : set[int] = set() self.logger = logging.getLogger(__name__) def _speed_to_step_delay(self, speed_mm_s): """Convert speed in mm/s to step delay in ms for do_step.""" steps_per_s = speed_mm_s * self.steps_per_mm if steps_per_s == 0: return 1000 # fallback to slow step_delay_s = 1.0 / steps_per_s return int(step_delay_s * 1000) # ms async def home(self): """Home both axes using endstops.""" self.logger.info("Homing linear axis...") max_linear_steps = int(self.linear_length_mm * self.steps_per_mm) home_step_delay = self._speed_to_step_delay(self.home_speed_mms) await self._home_axis(self.linear_dir_pin, self.linear_step_pin, self.linear_limit_pin, axis='linear', max_steps=max_linear_steps, step_delay=home_step_delay) self.curr_pos_mm = 0 self.logger.info("Homing rotational axis...") await self._home_axis(self.rot_dir_pin, self.rot_step_pin, self.rot_limit_pin, axis='rot', max_steps=self.full_rot_steps, step_delay=home_step_delay) self.curr_rot_deg = 0 self.logger.info("Mag distributor homed successfully.") async def _home_axis(self, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=200): if self.debug: return # Move in negative direction until endstop is triggered for _ in range(max_steps): if self.gpio.get_pin(endstop_pin): break self.gpio.do_step(dir_pin, step_pin, 1, step_delay, direction=True) await asyncio.sleep(0.001) else: raise RuntimeError(f"{axis} axis homing failed") self.logger.info(f"{axis} axis homed.") def move_mag_distributor_at_pos(self, pos_target, rot_target, step_delay=None): if self.debug: return self.logger.info(f"Moving mag distributor to linear: {pos_target} mm, rot: {rot_target} deg") linear_steps = int(abs(pos_target - self.curr_pos_mm) * self.steps_per_mm) rot_steps = int(abs(rot_target - self.curr_rot_deg) / 360 * self.full_rot_steps) linear_dir = True if pos_target > self.curr_pos_mm else False rot_dir = True if rot_target > self.curr_rot_deg else False # Use max speed for normal moves if not specified if step_delay is None: step_delay = self._speed_to_step_delay(self.max_speed_mms) if linear_steps > 0: self.gpio.do_step(self.linear_dir_pin, self.linear_step_pin, linear_steps, step_delay, linear_dir) self.curr_pos_mm = pos_target if rot_steps > 0: self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir) self.curr_rot_deg = rot_target def reset_empty_magazines(self): """Reset the set of magazines already checked for picking.""" self.empty_magazines.clear() def mag_to_feeder(self, magazine_id: Optional[int] = None): """ Move a cell from a magazine to the feeder. If magazine_id is None, use current_magazine and try next if pick fails. """ magazines = self.config.feeder_magazines if magazine_id is not None: mags_to_try = [magazine_id] else: # Only try magazines that have not been checked yet mags_to_try = [i for i in (list(range(self.current_magazine, len(magazines))) + list(range(0, self.current_magazine))) if i not in self.empty_magazines] # Check if cell can be picked from magazines, if not add to empty magazines and continue for mag_id in mags_to_try: pos = magazines[mag_id].mag_pos self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg) # Check cell pick sensor if self.mag_dist_sensor_pin and self.gpio.get_pin(self.mag_dist_sensor_pin): pos = self.config.feeder.mag_pos self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg) self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.") self.current_magazine = mag_id # update current return else: self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.") self.empty_magazines.add(mag_id) continue self.logger.warning("No more available magazines to pick from. All attempts failed.") def defeeder_to_mag(self, magazine_id: int): """ Move a cell from the defeeder to a specific magazine. Includes checks for valid magazine_id and sensor confirmation. """ # Check magazine_id validity magazines = self.config.defeeder_magazines if magazine_id < 0 or magazine_id >= len(magazines): self.logger.error(f"Invalid magazine_id: {magazine_id}") return # Move to defeeder position pos = self.config.defeeder.mag_pos self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg) # Optionally check for cell presence at defeeder (if sensor available) if self.mag_dist_sensor_pin and not self.gpio.get_pin(self.mag_dist_sensor_pin): self.logger.warning("No cell detected at defeeder position.") return # Move to target magazine position pos = magazines[magazine_id].mag_pos self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg) # Optionally check for successful placement (if sensor logic applies) self.logger.info(f"Cell collected from defeeder and moved to magazine {magazine_id}.")