|
@@ -10,17 +10,22 @@ from robot_control.src.utils.config import ConfigParser
|
|
|
logger = LoggerSingleton.get_logger()
|
|
logger = LoggerSingleton.get_logger()
|
|
|
|
|
|
|
|
class RobotMovement:
|
|
class RobotMovement:
|
|
|
- def __init__(self, grbl: GRBLHandler, config: ConfigParser):
|
|
|
|
|
- self.pi = pigpio.pi()
|
|
|
|
|
|
|
+ """Class for controlling robot movements and end effector operations."""
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, config: ConfigParser, grbl: GRBLHandler):
|
|
|
|
|
+ """Initialize the robot movement controller.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ grbl (GRBLHandler): GRBL communication handler
|
|
|
|
|
+ config (ConfigParser): Configuration parser instance
|
|
|
|
|
+ """
|
|
|
self.config = config
|
|
self.config = config
|
|
|
- vacuum_config = config.get_vacuum_config()
|
|
|
|
|
- self.valve_gpio_pin = vacuum_config.get("valve_gpio_pin")
|
|
|
|
|
- self.pi.set_mode(self.valve_gpio_pin, pigpio.OUTPUT)
|
|
|
|
|
- self.pi.write(self.valve_gpio_pin, 0) # Ensure pump is off initially
|
|
|
|
|
|
|
|
|
|
|
|
+ # Setting configs
|
|
|
sys_config = config.get_system_settings()
|
|
sys_config = config.get_system_settings()
|
|
|
self.speed: float = sys_config.speed
|
|
self.speed: float = sys_config.speed
|
|
|
|
|
|
|
|
|
|
+ # Basic variables for robot movement
|
|
|
self.home_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
self.home_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
self.current_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
self.current_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
self.is_moving: bool = False
|
|
self.is_moving: bool = False
|
|
@@ -31,9 +36,22 @@ class RobotMovement:
|
|
|
self.grbl.register_position_callback(self._position_callback)
|
|
self.grbl.register_position_callback(self._position_callback)
|
|
|
|
|
|
|
|
def _position_callback(self, pos: dict):
|
|
def _position_callback(self, pos: dict):
|
|
|
|
|
+ """Update the current position from GRBL feedback.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ pos (dict): Position dictionary containing coordinates
|
|
|
|
|
+ """
|
|
|
self.current_position = pos
|
|
self.current_position = pos
|
|
|
|
|
|
|
|
async def set_speed(self, speed_mms: float) -> None:
|
|
async def set_speed(self, speed_mms: float) -> None:
|
|
|
|
|
+ """Set the movement speed of the robot.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ speed_mms (float): Speed in millimeters per second
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ ValueError: If speed is not positive
|
|
|
|
|
+ """
|
|
|
if speed_mms <= 0:
|
|
if speed_mms <= 0:
|
|
|
raise ValueError("Speed must be positive")
|
|
raise ValueError("Speed must be positive")
|
|
|
self.speed = speed_mms
|
|
self.speed = speed_mms
|
|
@@ -41,6 +59,19 @@ class RobotMovement:
|
|
|
await self.grbl.send_gcode([f'F{speed_mms * 60}']) # Convert mm/s to mm/min
|
|
await self.grbl.send_gcode([f'F{speed_mms * 60}']) # Convert mm/s to mm/min
|
|
|
|
|
|
|
|
async def move_to_position(self, x: float, y: float, z: float) -> tuple[float, float, float]:
|
|
async def move_to_position(self, x: float, y: float, z: float) -> tuple[float, float, float]:
|
|
|
|
|
+ """Move the robot to an absolute position.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ x (float): X coordinate in mm
|
|
|
|
|
+ y (float): Y coordinate in mm
|
|
|
|
|
+ z (float): Z coordinate in mm
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ tuple[float, float, float]: Actual position reached
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ RuntimeError: If movement fails or times out
|
|
|
|
|
+ """
|
|
|
try:
|
|
try:
|
|
|
self.is_moving = True
|
|
self.is_moving = True
|
|
|
logger.debug(f"Moving to position X:{x:.3f} Y:{y:.3f} Z:{z:.3f}")
|
|
logger.debug(f"Moving to position X:{x:.3f} Y:{y:.3f} Z:{z:.3f}")
|
|
@@ -60,13 +91,31 @@ class RobotMovement:
|
|
|
self.is_moving = False
|
|
self.is_moving = False
|
|
|
|
|
|
|
|
async def move_relative(self, dx: float, dy: float, dz: float) -> tuple[float, float, float]:
|
|
async def move_relative(self, dx: float, dy: float, dz: float) -> tuple[float, float, float]:
|
|
|
|
|
+ """Move the robot by a relative distance.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ dx (float): Relative X movement in mm
|
|
|
|
|
+ dy (float): Relative Y movement in mm
|
|
|
|
|
+ dz (float): Relative Z movement in mm
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ tuple[float, float, float]: Actual position reached
|
|
|
|
|
+ """
|
|
|
logger.debug(f"Moving relative by dX:{dx:.3f} dY:{dy:.3f} dZ:{dz:.3f}")
|
|
logger.debug(f"Moving relative by dX:{dx:.3f} dY:{dy:.3f} dZ:{dz:.3f}")
|
|
|
await self.grbl.send_gcode(['G91']) # Relative positioning
|
|
await self.grbl.send_gcode(['G91']) # Relative positioning
|
|
|
result = await self.move_to_position(dx, dy, dz)
|
|
result = await self.move_to_position(dx, dy, dz)
|
|
|
await self.grbl.send_gcode(['G90']) # Back to absolute
|
|
await self.grbl.send_gcode(['G90']) # Back to absolute
|
|
|
return result
|
|
return result
|
|
|
|
|
|
|
|
- async def home(self) -> bool:
|
|
|
|
|
|
|
+ async def perform_homing(self) -> bool:
|
|
|
|
|
+ """Home all axes of the machine.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ bool: True if homing successful
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ RuntimeError: If homing fails
|
|
|
|
|
+ """
|
|
|
self.is_moving = True
|
|
self.is_moving = True
|
|
|
try:
|
|
try:
|
|
|
logger.info("Homing machine...")
|
|
logger.info("Homing machine...")
|
|
@@ -78,36 +127,16 @@ class RobotMovement:
|
|
|
raise RuntimeError(f"Homing failed: {str(e)}")
|
|
raise RuntimeError(f"Homing failed: {str(e)}")
|
|
|
|
|
|
|
|
def get_position(self) -> tuple[float, float, float]:
|
|
def get_position(self) -> tuple[float, float, float]:
|
|
|
- return self.current_position
|
|
|
|
|
-
|
|
|
|
|
- async def activate_endeffector(self) -> bool:
|
|
|
|
|
- """Activate the pump end effector."""
|
|
|
|
|
- try:
|
|
|
|
|
- self.pi.write(self.valve_gpio_pin, 1)
|
|
|
|
|
- self.endeffector_active = True
|
|
|
|
|
- logger.debug("End effector activated")
|
|
|
|
|
- return True
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- raise RuntimeError(f"Failed to activate end effector: {str(e)}")
|
|
|
|
|
-
|
|
|
|
|
|
|
+ """Get the current position of the robot.
|
|
|
|
|
|
|
|
- async def deactivate_endeffector(self) -> bool:
|
|
|
|
|
- """Deactivate the pump end effector."""
|
|
|
|
|
- try:
|
|
|
|
|
- self.pi.write(self.valve_gpio_pin, 0)
|
|
|
|
|
- self.endeffector_active = False
|
|
|
|
|
- logger.debug("End effector deactivated")
|
|
|
|
|
- return True
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- raise RuntimeError(f"Failed to deactivate end effector: {str(e)}")
|
|
|
|
|
-
|
|
|
|
|
- def is_endeffector_active(self) -> bool:
|
|
|
|
|
- """Get end effector state."""
|
|
|
|
|
- return self.endeffector_active
|
|
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ tuple[float, float, float]: Current (X, Y, Z) position
|
|
|
|
|
+ """
|
|
|
|
|
+ return self.current_position
|
|
|
|
|
|
|
|
async def stop(self) -> None:
|
|
async def stop(self) -> None:
|
|
|
"""Emergency stop."""
|
|
"""Emergency stop."""
|
|
|
self.is_moving = False
|
|
self.is_moving = False
|
|
|
self.endeffector_active = False
|
|
self.endeffector_active = False
|
|
|
logger.warning("Emergency stop triggered!")
|
|
logger.warning("Emergency stop triggered!")
|
|
|
- await self.grbl.send_gcode(['!', 'M5']) # Feed hold + stop pump
|
|
|
|
|
|
|
+ await self.grbl.send_gcode(['!']) # Feed hold
|