| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import logging
- import asyncio
- from robot_control.src.utils.config import RobotConfig
- from robot_control.src.api.gpio import GPIOInterface
- from typing import Optional
- class MagDistributor:
- def __init__(self, config: RobotConfig, gpio: GPIOInterface):
- self.config = config
- self.gpio = gpio
- # Pin assignments from config
- gpio_conf = config.gpio
- self.linear_dir_pin = gpio_conf.mag_dist_pos_dir_pin
- self.linear_step_pin = gpio_conf.mag_dist_pos_step_pin
- # self.linear_en_pin = gpio_conf.mag_dist_pos_en_pin
- self.rot_dir_pin = gpio_conf.mag_dist_rot_dir_pin
- self.rot_step_pin = gpio_conf.mag_dist_rot_step_pin
- # self.rot_en_pin = gpio_conf.mag_dist_rot_en_pin
- # Endstop pins
- self.linear_limit_pin = gpio_conf.mag_dist_pos_limit_pin
- self.rot_limit_pin = gpio_conf.mag_dist_rot_limit_pin
- # Cell pick sensor pin
- self.mag_dist_sensor_pin = gpio_conf.mag_dist_sensor_pin # <-- Add this to your config
- # Max travel (mm or steps, as appropriate)
- self.full_rot_steps = 3200 # Assuming 3200 steps for a full rotation
- self.linear_length_mm = config.mag_distributor.length_mm
- self.steps_per_mm = self.full_rot_steps / 40 # Assuming 40mm per rotation
- self.max_speed_mms = config.mag_distributor.max_speed_mms # mm/s
- self.home_speed_mms = config.mag_distributor.home_speed_mms # mm/s
- self.debug = config.mag_distributor.debug
- # Current position tracking (steps or mm/deg)
- self.curr_pos_mm = 0
- self.curr_rot_deg = 0
- # Track current feeding magazine
- self.current_magazine = 0
- # Track magazines already checked for picking
- self.empty_magazines : set[int] = set()
- self.logger = logging.getLogger(__name__)
- def _speed_to_step_delay(self, speed_mm_s):
- """Convert speed in mm/s to step delay in ms for do_step."""
- steps_per_s = speed_mm_s * self.steps_per_mm
- if steps_per_s == 0:
- return 1000 # fallback to slow
- step_delay_s = 1.0 / steps_per_s
- return int(step_delay_s * 1000) # ms
- async def home(self):
- """Home both axes using endstops."""
- self.logger.info("Homing linear axis...")
- max_linear_steps = int(self.linear_length_mm * self.steps_per_mm)
- home_step_delay = self._speed_to_step_delay(self.home_speed_mms)
- await self._home_axis(self.linear_dir_pin, self.linear_step_pin, self.linear_limit_pin, axis='linear', max_steps=max_linear_steps, step_delay=home_step_delay)
- self.curr_pos_mm = 0
- self.logger.info("Homing rotational axis...")
- await self._home_axis(self.rot_dir_pin, self.rot_step_pin, self.rot_limit_pin, axis='rot', max_steps=self.full_rot_steps, step_delay=home_step_delay)
- self.curr_rot_deg = 0
- self.logger.info("Mag distributor homed successfully.")
- async def _home_axis(self, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=200):
- if self.debug:
- return
- # Move in negative direction until endstop is triggered
- for _ in range(max_steps):
- if self.gpio.get_pin(endstop_pin):
- break
- self.gpio.do_step(dir_pin, step_pin, 1, step_delay, direction=True)
- await asyncio.sleep(0.001)
- else:
- raise RuntimeError(f"{axis} axis homing failed")
- self.logger.info(f"{axis} axis homed.")
- def move_mag_distributor_at_pos(self, pos_target, rot_target, step_delay=None):
- if self.debug:
- return
- self.logger.info(f"Moving mag distributor to linear: {pos_target} mm, rot: {rot_target} deg")
- linear_steps = int(abs(pos_target - self.curr_pos_mm) * self.steps_per_mm)
- rot_steps = int(abs(rot_target - self.curr_rot_deg) / 360 * self.full_rot_steps)
- linear_dir = True if pos_target > self.curr_pos_mm else False
- rot_dir = True if rot_target > self.curr_rot_deg else False
- # Use max speed for normal moves if not specified
- if step_delay is None:
- step_delay = self._speed_to_step_delay(self.max_speed_mms)
- if linear_steps > 0:
- self.gpio.do_step(self.linear_dir_pin, self.linear_step_pin, linear_steps, step_delay, linear_dir)
- self.curr_pos_mm = pos_target
- if rot_steps > 0:
- self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir)
- self.curr_rot_deg = rot_target
- def reset_empty_magazines(self):
- """Reset the set of magazines already checked for picking."""
- self.empty_magazines.clear()
- def mag_to_feeder(self, magazine_id: Optional[int] = None):
- """
- Move a cell from a magazine to the feeder.
- If magazine_id is None, use current_magazine and try next if pick fails.
- """
- magazines = self.config.feeder_magazines
- if magazine_id is not None:
- mags_to_try = [magazine_id]
- else:
- # Only try magazines that have not been checked yet
- mags_to_try = [i for i in (list(range(self.current_magazine, len(magazines))) + list(range(0, self.current_magazine)))
- if i not in self.empty_magazines]
-
- # Check if cell can be picked from magazines, if not add to empty magazines and continue
- for mag_id in mags_to_try:
- pos = magazines[mag_id].mag_pos
- self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
- # Check cell pick sensor
- if self.mag_dist_sensor_pin and self.gpio.get_pin(self.mag_dist_sensor_pin):
- pos = self.config.feeder.mag_pos
- self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
- self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.")
- self.current_magazine = mag_id # update current
- return
- else:
- self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.")
- self.empty_magazines.add(mag_id)
- continue
- self.logger.warning("No more available magazines to pick from. All attempts failed.")
- def defeeder_to_mag(self, magazine_id: int):
- """
- Move a cell from the defeeder to a specific magazine.
- Includes checks for valid magazine_id and sensor confirmation.
- """
- # Check magazine_id validity
- magazines = self.config.defeeder_magazines
- if magazine_id < 0 or magazine_id >= len(magazines):
- self.logger.error(f"Invalid magazine_id: {magazine_id}")
- return
- # Move to defeeder position
- pos = self.config.defeeder.mag_pos
- self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
- # Optionally check for cell presence at defeeder (if sensor available)
- if self.mag_dist_sensor_pin and not self.gpio.get_pin(self.mag_dist_sensor_pin):
- self.logger.warning("No cell detected at defeeder position.")
- return
- # Move to target magazine position
- pos = magazines[magazine_id].mag_pos
- self.move_mag_distributor_at_pos(pos.pos_mm, pos.rot_deg)
- # Optionally check for successful placement (if sensor logic applies)
- self.logger.info(f"Cell collected from defeeder and moved to magazine {magazine_id}.")
|