Browse Source

Connect robot movements to controller

Add robot gripper
Add async tests
Add safe_height
Silas Gruen 11 months ago
parent
commit
d0f28c040b

+ 1 - 0
robot-control/config/config.yaml

@@ -34,3 +34,4 @@ dropoff_grades:
 system_settings:
   speed: 100.0
   acceleration: 500.0
+  safe_height: 20.0

+ 3 - 1
robot-control/src/robot/config.py

@@ -29,6 +29,7 @@ class DropoffGradeConfig:
 class SystemConfig:
     speed: float
     acceleration: float
+    safe_height: float
 
 class ConfigParser:
     def __init__(self, config_path: str = "robot-control/config/config.yaml"):
@@ -77,5 +78,6 @@ class ConfigParser:
         settings = self.config['system_settings']
         return SystemConfig(
             speed=settings['speed'],
-            acceleration=settings['acceleration']
+            acceleration=settings['acceleration'],
+            safe_height=settings['safe_height']
         )

+ 107 - 26
robot-control/src/robot/controller.py

@@ -2,6 +2,7 @@ from dataclasses import dataclass
 from enum import Enum
 from typing import List, Tuple
 from .config import ConfigParser, SlotConfig
+from .movement import RobotMovement
 import logging
 
 logger = logging.getLogger(__name__)
@@ -33,6 +34,10 @@ class RobotController:
         self.rejected_grade = self.dropoff_grades['rejected']
         self.system_settings = self.config.get_system_settings()
         
+        # Initialize robot movement
+        self.movement = RobotMovement()
+        self.movement.set_speed(self.system_settings.speed)
+        
         # Initialize with configured values
         self.total_slots = sum(len(device.slots) for device in self.devices)
         self.work_queue: List[SlotConfig] = []
@@ -40,10 +45,31 @@ class RobotController:
         self.gripper_occupied = False
 
     async def pick_cell_from_feeder(self):
-        # Implementation for picking cell from feeder
-        logger.info("Picking cell from feeder")
-        self.gripper_occupied = True
-        pass
+        if self.gripper_occupied:
+            logger.error("Gripper already occupied")
+            return False
+            
+        try:
+            feeder_pos = self.feeder.pickup_position
+            x, y, z = feeder_pos
+            # Move to safe height first
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            # Move to pickup position
+            await self.movement.move_to_position(x, y, z)
+            # Grip cell
+            if await self.movement.activate_gripper():
+                self.gripper_occupied = True
+            else:
+                raise RuntimeError("Failed to grip cell")
+            # Move back to safe height
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            logger.info("Cell picked from feeder")
+            return True
+        except Exception as e:
+            logger.error(f"Failed to pick cell from feeder: {str(e)}")
+            await self.movement.deactivate_gripper() # Try to deactivate to avoid stuck gripper
+            self.gripper_occupied = False
+            return False
 
     async def read_cell_id(self):
         # Use vision system to read data matrix and return cell ID
@@ -71,43 +97,98 @@ class RobotController:
 
     async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig):
         if slot.occupied:
-            logger.error(f"Slot {slot.id} is already occupied")
-            return
+            logger.error(f"Slot at position {slot.position} is already occupied")
+            return False
         if not self.gripper_occupied:
             logger.error("Gripper not occupied")
-            return
-        slot.occupied = True
-        slot.cell_id = cell.id
-        self.gripper_occupied = False
-        logger.info(f"Cell {cell.id} inserted to slot at position {slot.position}")
-        # Move to slot and insert cell
-        pass
+            return False
+
+        # Move to slot position
+        try:
+            x, y, z = slot.position
+            # Move to safe height first
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            # Move down to insertion position
+            await self.movement.move_to_position(x, y, z)
+            # Release cell
+            if await self.movement.deactivate_gripper():
+                slot.occupied = True
+                slot.cell_id = cell.id
+                self.gripper_occupied = False
+            else:
+                raise RuntimeError("Failed to release cell")
+            # Move back to safe height
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            logger.info(f"Cell {cell.id} inserted to slot at position {slot.position}")
+            return True
+        except Exception as e:
+            logger.error(f"Failed to insert cell: {str(e)}")
+            return False
 
     async def collect_cell_from_slot(self, slot: SlotConfig):
         if self.gripper_occupied:
             logger.error("Gripper already occupied")
             return None
-        # Collect cell from measurement slot
-        self.gripper_occupied = True
-        slot.occupied = False
-        cell_id = slot.cell_id
-        slot.cell_id = None
-        logger.info(f"Cell {slot.cell_id} collected from slot at position {slot.position}")
-        return cell_id
+        
+        try:
+            x, y, z = slot.position
+            # Move to safe height first
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            # Move down to collection position
+            await self.movement.move_to_position(x, y, z)
+            # Grip cell
+            if await self.movement.activate_gripper():
+                self.gripper_occupied = True
+                cell_id = slot.cell_id
+                slot.occupied = False
+                slot.cell_id = None
+            else:
+                raise RuntimeError("Failed to grip cell")
+            # Move back to safe height
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            logger.info(f"Cell {cell_id} collected from slot at position {slot.position}")
+            return cell_id
+        except Exception as e:
+            logger.error(f"Failed to collect cell: {str(e)}")
+            await self.movement.deactivate_gripper() # Try to deactivate to avoid stuck gripper
+            self.gripper_occupied = False
+            return None
 
     async def sort_cell(self, cell: Cell):
+        if not self.gripper_occupied:
+            logger.error("Gripper not occupied")
+            return False
         if cell.status == CellStatus.FAILED:
-            await self.dropoff_cell(cell.measurement_slot, self.rejected_grade)
+            await self.dropoff_cell(self.rejected_grade)
             logger.info(f"Cell {cell.id} sorted to rejected grade")
-            return
+            return True
         for name, grade in self.dropoff_grades.items():
             if cell.capacity >= grade.capacity_threshold:
-                await self.dropoff_cell(cell, grade)
+                await self.dropoff_cell(grade)
                 self.gripper_occupied = False
                 logger.info(f"Cell {cell.id} sorted to grade {name}")
-                return
+                return True
         logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}")
+        return False
 
     async def dropoff_cell(self, dropoff_grade: DropoffGrade):
-        # Drop collected cell at position
-        pass
+        if not self.gripper_occupied:
+            logger.error("Cannot drop off: gripper not occupied")
+            return
+            
+        try:
+            x, y, z = dropoff_grade.position
+            # Move to safe height first
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            # Move to dropoff position
+            await self.movement.move_to_position(x, y, z)
+            # Release cell
+            if await self.movement.deactivate_gripper():
+                self.gripper_occupied = False
+            else:
+                raise RuntimeError("Failed to release cell")
+            # Move back to safe height
+            await self.movement.move_to_position(x, y, self.system_settings.safe_height)
+            logger.info(f"Cell dropped off at grade {dropoff_grade.name}")
+        except Exception as e:
+            logger.error(f"Failed to drop off cell: {str(e)}")

+ 24 - 7
robot-control/src/robot/movement.py

@@ -9,6 +9,7 @@ class RobotMovement:
         self.speed: float = 100.0  # Default speed in mm/s
         self.is_moving: bool = False
         self.home_position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
+        self.gripper_state: bool = False  # False = open, True = closed
 
     def set_speed(self, speed: float) -> None:
         """Set the movement speed in mm/s."""
@@ -16,7 +17,7 @@ class RobotMovement:
             raise ValueError("Speed must be positive")
         self.speed = speed
 
-    def move_to_position(self, x: float, y: float, z: float) -> bool:
+    async def move_to_position(self, x: float, y: float, z: float) -> bool:
         """Move to absolute position. Returns True if movement successful."""
         try:
             self.is_moving = True
@@ -28,22 +29,22 @@ class RobotMovement:
             self.is_moving = False
             raise RuntimeError(f"Movement failed: {str(e)}")
 
-    def move_relative(self, dx: float, dy: float, dz: float) -> bool:
+    async def move_relative(self, dx: float, dy: float, dz: float) -> bool:
         """Move relative to current position."""
         new_x = self.current_position[0] + dx
         new_y = self.current_position[1] + dy
         new_z = self.current_position[2] + dz
-        return self.move_to_position(new_x, new_y, new_z)
+        return await self.move_to_position(new_x, new_y, new_z)
 
-    def home(self) -> bool:
+    async def home(self) -> bool:
         """Move to home position."""
-        return self.move_to_position(*self.home_position)
+        return await self.move_to_position(*self.home_position)
 
     def get_position(self) -> Tuple[float, float, float]:
         """Get current position."""
         return self.current_position
 
-    def stop(self) -> None:
+    async def stop(self) -> None:
         """Emergency stop."""
         self.is_moving = False
         # Implement emergency stop command here
@@ -55,4 +56,20 @@ class RobotMovement:
             if timeout and (time.time() - start_time) > timeout:
                 return False
             time.sleep(0.1)
-        return True
+        return True
+
+    async def activate_gripper(self) -> bool:
+        """Close gripper to grip cell. Returns True if successful."""
+        try:
+            self.gripper_state = True
+            return True
+        except Exception as e:
+            raise RuntimeError(f"Failed to close gripper: {str(e)}")
+
+    async def deactivate_gripper(self) -> bool:
+        """Open gripper to release cell. Returns True if successful."""
+        try:
+            self.gripper_state = False
+            return True
+        except Exception as e:
+            raise RuntimeError(f"Failed to open gripper: {str(e)}")

+ 53 - 6
robot-control/tests/test_robot.py

@@ -23,15 +23,16 @@ class TestRobotMovement:
         with pytest.raises(ValueError):
             robot_movement.set_speed(-100.0)
 
-    def test_move_to_position(self, robot_movement: RobotMovement):
-        result = robot_movement.move_to_position(100.0, 200.0, 300.0)
+    @pytest.mark.asyncio
+    async def test_move_to_position(self, robot_movement: RobotMovement):
+        result = await robot_movement.move_to_position(100.0, 200.0, 300.0)
         assert result is True
         assert robot_movement.get_position() == (100.0, 200.0, 300.0)
 
-    def test_move_relative(self, robot_movement: RobotMovement):
-        robot_movement.move_to_position(100.0, 100.0, 100.0)
-        result = robot_movement.move_relative(50.0, -50.0, 25.0)
-        assert result is True
+    @pytest.mark.asyncio
+    async def test_move_relative(self, robot_movement: RobotMovement):
+        await robot_movement.move_to_position(100.0, 100.0, 100.0)
+        assert await robot_movement.move_relative(50.0, -50.0, 25.0)
         assert robot_movement.get_position() == (150.0, 50.0, 125.0)
 
 class TestRobotController:
@@ -62,3 +63,49 @@ class TestRobotController:
         assert not slot.occupied
         assert slot.cell_id is None
         assert collected_cell == cell.id
+
+    @pytest.mark.asyncio
+    async def test_sort_empty_gripper(self, robot_controller: RobotController):
+        cell = Cell(
+            id="test_cell",
+            status=CellStatus.COMPLETED,
+            measurement_slot=1,
+            capacity=3000.0
+        )
+        robot_controller.gripper_occupied = False
+        assert not await robot_controller.sort_cell(cell)
+
+    @pytest.mark.asyncio
+    async def test_sort_failed_cell(self, robot_controller: RobotController):
+        failed_cell = Cell(
+            id="failed_cell",
+            status=CellStatus.FAILED,
+            measurement_slot=1
+        )
+        robot_controller.gripper_occupied = True
+        await robot_controller.sort_cell(failed_cell)
+        assert not robot_controller.gripper_occupied
+        
+    @pytest.mark.asyncio
+    async def test_sort_cell_by_capacity(self, robot_controller: RobotController):
+        # Test sorting a cell with capacity that matches a grade
+        cell = Cell(
+            id="test_cell",
+            status=CellStatus.COMPLETED,
+            measurement_slot=1,
+            capacity=3000.0
+        )
+        robot_controller.gripper_occupied = True
+        await robot_controller.sort_cell(cell)
+        assert not robot_controller.gripper_occupied
+
+        # Test sorting a cell with capacity that doesn't match any grade
+        low_cap_cell = Cell(
+            id="low_cap_cell",
+            status=CellStatus.COMPLETED,
+            measurement_slot=1,
+            capacity=0.0
+        )
+        robot_controller.gripper_occupied = True
+        await robot_controller.sort_cell(low_cap_cell)
+        assert not robot_controller.gripper_occupied