|
|
@@ -1,28 +1,36 @@
|
|
|
from typing import Tuple, Optional
|
|
|
import time
|
|
|
-
|
|
|
-# TODO[SG]: Implement actual robot movement commands. This is just a placeholder.
|
|
|
+import asyncio
|
|
|
+from ..api.grbl_handler import GRBLHandler
|
|
|
|
|
|
class RobotMovement:
|
|
|
- def __init__(self):
|
|
|
- self.current_position: Tuple[float, float, float] = (0.0, 0.0, 0.0) # x, y, z
|
|
|
- self.speed: float = 100.0 # Default speed in mm/s
|
|
|
+ def __init__(self, grbl: GRBLHandler):
|
|
|
+ self.grbl = grbl
|
|
|
+ self.current_position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
+ self.speed: float = 100.0 # TODO [SG]: Set at beginning?
|
|
|
self.is_moving: bool = False
|
|
|
self.home_position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
- self.gripper_state: bool = False # False = open, True = closed
|
|
|
+
|
|
|
+ # Register for position updates
|
|
|
+ self.grbl.register_position_callback(self._position_callback)
|
|
|
|
|
|
- def set_speed(self, speed: float) -> None:
|
|
|
- """Set the movement speed in mm/s."""
|
|
|
- if speed <= 0:
|
|
|
+ def _position_callback(self, pos: dict):
|
|
|
+ self.current_position = (pos['X'], pos['Y'], pos['Z'])
|
|
|
+
|
|
|
+ def set_speed(self, speed_mms: float) -> None:
|
|
|
+ if speed_mms <= 0:
|
|
|
raise ValueError("Speed must be positive")
|
|
|
- self.speed = speed
|
|
|
+ self.speed = speed_mms
|
|
|
+ # Set GRBL feed rate
|
|
|
+ self.grbl.send_gcode([f'F{speed_mms * 60}']) # Convert mm/s to mm/min
|
|
|
|
|
|
async def move_to_position(self, x: float, y: float, z: float) -> bool:
|
|
|
- """Move to absolute position. Returns True if movement successful."""
|
|
|
try:
|
|
|
self.is_moving = True
|
|
|
- # Here you would implement actual robot movement commands
|
|
|
- self.current_position = (x, y, z)
|
|
|
+ # Send G-code move command
|
|
|
+ await self.grbl.send_gcode([
|
|
|
+ f'G1 X{x:.3f} Y{y:.3f} Z{z:.3f} F{self.speed * 60}'
|
|
|
+ ])
|
|
|
self.is_moving = False
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
@@ -30,46 +38,33 @@ class RobotMovement:
|
|
|
raise RuntimeError(f"Movement failed: {str(e)}")
|
|
|
|
|
|
async def move_relative(self, dx: float, dy: float, dz: float) -> bool:
|
|
|
- """Move relative to current position."""
|
|
|
- new_x = self.current_position[0] + dx
|
|
|
- new_y = self.current_position[1] + dy
|
|
|
- new_z = self.current_position[2] + dz
|
|
|
- return await self.move_to_position(new_x, new_y, new_z)
|
|
|
+ await self.grbl.send_gcode(['G91']) # Relative positioning
|
|
|
+ result = await self.move_to_position(dx, dy, dz)
|
|
|
+ await self.grbl.send_gcode(['G90']) # Back to absolute
|
|
|
+ return result
|
|
|
|
|
|
async def home(self) -> bool:
|
|
|
- """Move to home position."""
|
|
|
- return await self.move_to_position(*self.home_position)
|
|
|
+ self.is_moving = True
|
|
|
+ try:
|
|
|
+ await self.grbl.send_gcode(['$H']) # GRBL home command
|
|
|
+ self.is_moving = False
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ self.is_moving = False
|
|
|
+ raise RuntimeError(f"Homing failed: {str(e)}")
|
|
|
|
|
|
def get_position(self) -> Tuple[float, float, float]:
|
|
|
- """Get current position."""
|
|
|
return self.current_position
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
"""Emergency stop."""
|
|
|
self.is_moving = False
|
|
|
- # Implement emergency stop command here
|
|
|
+ await self.grbl.send_gcode(['!', 'M5']) # Feed hold + stop endeffector
|
|
|
|
|
|
- def wait_until_idle(self, timeout: Optional[float] = None) -> bool:
|
|
|
- """Wait until movement is complete or timeout (in seconds) is reached."""
|
|
|
+ async def wait_until_idle(self, timeout: Optional[float] = None) -> bool:
|
|
|
start_time = time.time()
|
|
|
while self.is_moving:
|
|
|
if timeout and (time.time() - start_time) > timeout:
|
|
|
return False
|
|
|
- time.sleep(0.1)
|
|
|
- return True
|
|
|
-
|
|
|
- async def activate_gripper(self) -> bool:
|
|
|
- """Close gripper to grip cell. Returns True if successful."""
|
|
|
- try:
|
|
|
- self.gripper_state = True
|
|
|
- return True
|
|
|
- except Exception as e:
|
|
|
- raise RuntimeError(f"Failed to close gripper: {str(e)}")
|
|
|
-
|
|
|
- async def deactivate_gripper(self) -> bool:
|
|
|
- """Open gripper to release cell. Returns True if successful."""
|
|
|
- try:
|
|
|
- self.gripper_state = False
|
|
|
- return True
|
|
|
- except Exception as e:
|
|
|
- raise RuntimeError(f"Failed to open gripper: {str(e)}")
|
|
|
+ await asyncio.sleep(0.1) # Non-blocking wait
|
|
|
+ return True
|