Преглед на файлове

feat: enhance magazine management; add health thresholds

Silas Gruen преди 6 месеца
родител
ревизия
dd204f0fbc
променени са 4 файла, в които са добавени 99 реда и са изтрити 42 реда
  1. 20 7
      robot_control/config/config.yaml
  2. 15 14
      robot_control/src/robot/controller.py
  3. 40 9
      robot_control/src/robot/mag_distributor.py
  4. 24 12
      robot_control/src/utils/config.py

+ 20 - 7
robot_control/config/config.yaml

@@ -36,15 +36,28 @@ defeeder:
   position: [167.5, 630, 40]
   max_capacity: 10
 
-dropoff_grades:
-  - id: accepted
-    x_pos: 200
+feeder_magazines:
+  - x_pos: 100
+    rot_deg: 90
+    capacity: 70
+  - x_pos: 200
+    rot_deg: 90
+    capacity: 70
+  - x_pos: 300
+    rot_deg: 90
+    capacity: 70
+
+defeeder_magazines:
+  - x_pos: 200
     rot_deg: -90
-    capacity_threshold: 0.8
-  - id: rejected
-    x_pos: 100
+    capacity: 70
+    health_threshold: 0.8
+    name: accepted
+  - x_pos: 100
     rot_deg: -90
-    capacity_threshold: 0
+    capacity: 70
+    health_threshold: 0
+    name: rejected
 
 mqtt:
   broker: localhost # or debug

+ 15 - 14
robot_control/src/robot/controller.py

@@ -32,7 +32,7 @@ class RobotController:
         self.cells: dict[int, Cell] = {}
         self.devices = self.config.measurement_devices
         self.feeder = self.config.feeder
-        self.dropoff_grades = self.config.dropoff_grades
+        self.defeeder_magazines = self.config.defeeder_magazines
         self.gpio = gpio_handler
         self.i2c = i2c
         self.vision = vision
@@ -318,31 +318,32 @@ class RobotController:
 
         if cell.status is CellStatus.ERROR:
             cell.capacity = 0 # will be dropped off in the lowest grade
-        for grade in self.dropoff_grades:
-            if cell.capacity >= grade.capacity_threshold:
-                await self.dropoff_cell(grade)
-                logger.info(f"Cell {cell.id} sorted to grade {grade.id}")
+        for idx, mag in enumerate(self.defeeder_magazines):
+            if cell.capacity >= mag.health_threshold:
+                await self.dropoff_cell(idx)
+                logger.info(f"Cell {cell.id} sorted to magazine {mag.name}")
                 return True
-        logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}")
+        logger.error(f"No suitable magazine found for cell {cell.id} with capacity {cell.capacity}")
         return False
     
-    async def dropoff_cell(self, dropoff_grade: Optional[DropoffGradeConfig] = None):
+    async def dropoff_cell(self, defeeder_mag_idx: Optional[int] = None):
         if not self.gripper_occupied:
             logger.error("Cannot drop off: gripper not occupied")
             return
 
-        # If no dropoff_grade is given, use the last one in the list
-        if dropoff_grade is None:
-            if not self.dropoff_grades:
-                logger.error("No dropoff grades configured")
+        # If no defeeddefeeder_mag_idx is given, use the last one in the list
+        if defeeder_mag_idx is None:
+            if not self.defeeder_magazines:
+                logger.error("No dropoff magazines configured")
                 return
-            dropoff_grade = self.dropoff_grades[-1]
+            defeeder_mag_idx = len(self.defeeder_magazines)-1
         # Add the grade id to defeeder_queue when a cell is dropped off
         if self.defeeder_queue is not None:
             try:
-                await self.defeeder_queue.put(int(dropoff_grade.id))
+                await self.defeeder_queue.put(defeeder_mag_idx)
             except Exception as e:
                 logger.warning(f"Failed to put to defeeder_queue: {e}")
+
         pos = self.config.defeeder.robot_pos
         safe_pos = (pos[0], pos[1], self.config.movement.safe_height)
         logger.info(f"Moving to dropoff (safe) {safe_pos}...")
@@ -357,7 +358,7 @@ class RobotController:
         # Move back to safe height
         logger.info(f"Moving to dropoff position (safe) {safe_pos}...")
         await self.movement.move_to_position(*safe_pos)
-        logger.info(f"Cell dropped off at grade {dropoff_grade.id}")
+        logger.info(f"Cell dropped off at defeeder")
 
     def check_cell_voltage(self, voltage, cell_id_str = ""):
         if voltage < abs(self.config.feeder.min_voltage):

+ 40 - 9
robot_control/src/robot/mag_distributor.py

@@ -40,6 +40,9 @@ class MagDistributor:
         # Track current feeding magazine
         self.current_magazine = 0
 
+        # Track magazines already checked for picking
+        self.empty_magazines : set[int] = set()
+
         self.logger = logging.getLogger(__name__)
 
     def _speed_to_step_delay(self, speed_mm_s):
@@ -89,39 +92,67 @@ class MagDistributor:
             self.gpio.do_step(self.rot_dir_pin, self.rot_step_pin, rot_steps, step_delay, rot_dir)
             self.curr_rot_deg = rot_target
 
+    def reset_empty_magazines(self):
+        """Reset the set of magazines already checked for picking."""
+        self.empty_magazines.clear()
+
     def mag_to_feeder(self, magazine_id: Optional[int] = None):
         """
         Move a cell from a magazine to the feeder.
         If magazine_id is None, use current_magazine and try next if pick fails.
         """
         magazines = self.config.feeder.magazines
-        num_magazines = len(magazines)
-        tried = 0
         if magazine_id is not None:
             mags_to_try = [magazine_id]
         else:
-            mags_to_try = list(range(self.current_magazine, num_magazines)) + list(range(0, self.current_magazine))
+            # Only try magazines that have not been checked yet
+            mags_to_try = [i for i in (list(range(self.current_magazine, len(magazines))) + list(range(0, self.current_magazine)))
+                           if i not in self.empty_magazines]
+            
+        # Check if cell can be picked from magazines, if not add to empty magazines and continue
         for mag_id in mags_to_try:
-            pos = magazines[mag_id].pos
-            self.move_mag_distributor_at_pos(pos[0], pos[1])
+            x_pos = magazines[mag_id].x_pos
+            rot_deg = magazines[mag_id].rot_deg
+            self.move_mag_distributor_at_pos(x_pos, rot_deg)
             # Check cell pick sensor
             if self.gpio.get_pin(self.mag_dist_sensor_pin):
                 pos = self.config.feeder.mag_pos
+                x_pos = magazines[mag_id].x_pos
+                rot_deg = magazines[mag_id].rot_deg
                 self.move_mag_distributor_at_pos(pos[0], pos[1])
                 self.logger.info(f"Cell successfully picked from magazine {mag_id} and deposited to feeder.")
                 self.current_magazine = mag_id  # update current
                 return
             else:
                 self.logger.warning(f"Failed to pick cell from magazine {mag_id}. Trying next magazine.")
-                tried += 1
+                self.empty_magazines.add(mag_id)
                 continue
         self.logger.warning("No more available magazines to pick from. All attempts failed.")
 
     def defeeder_to_mag(self, magazine_id: int):
+        """
+        Move a cell from the defeeder to a specific magazine.
+        Includes checks for valid magazine_id and sensor confirmation.
+        """
+        # Check magazine_id validity
+        magazines = self.config.defeeder.magazines
+        if magazine_id < 0 or magazine_id >= len(magazines):
+            self.logger.error(f"Invalid magazine_id: {magazine_id}")
+            return
+
+        # Move to defeeder position
         pos = self.config.defeeder.mag_pos
         self.move_mag_distributor_at_pos(pos[0], pos[1])
-        pos = self.config.defeeder.magazines[magazine_id].pos
-        self.move_mag_distributor_at_pos(pos[0], pos[1])
-        self.logger.info("Cell collected from defeeder.")
+
+        # Optionally check for cell presence at defeeder (if sensor available)
+        if not self.gpio.get_pin(self.mag_dist_sensor_pin):
+            self.logger.warning("No cell detected at defeeder position.")
+            return
+
+        # Move to target magazine position
+        self.move_mag_distributor_at_pos(magazines[magazine_id].x_pos, magazines[magazine_id].rot_deg)
+
+        # Optionally check for successful placement (if sensor logic applies)
+        self.logger.info(f"Cell collected from defeeder and moved to magazine {magazine_id}.")
 
 

+ 24 - 12
robot_control/src/utils/config.py

@@ -27,8 +27,14 @@ class DeviceConfig(BaseModel):
         return v
 
 class MagazineConfig(BaseModel):
-    pos : Tuple[float, float]
+    x_pos: float = Field(default=0.0, ge=0, le=700)
+    rot_deg: float = Field(default=0.0, ge=-180.0, le=180.0)
     capacity: int = Field(default=10, ge=0)
+
+class DefeederMagazineConfig(MagazineConfig):
+    health_threshold: float = Field(ge=0.0, le=1.0)
+    name: str
+
 class FeederConfig(BaseModel):
     robot_pos: Tuple[float, float, float]
     mag_pos : Tuple[float, float]
@@ -40,7 +46,7 @@ class DefeederConfig(BaseModel):
     robot_pos: Tuple[float, float, float]
     mag_pos : Tuple[float, float]
     max_capacity: int = Field(default=10, ge=0)
-    magazines: List[MagazineConfig] = Field(default_factory=list)
+    magazines: List[DefeederMagazineConfig] = Field(default_factory=list)
 
 class MagDistributorConfig(BaseModel):
     max_speed: float = Field(default=100.0, ge=0.0)
@@ -51,7 +57,7 @@ class DropoffGradeConfig(BaseModel):
     id: str
     x_pos: float = Field(default=0.0, ge=0, le=700)
     rot_deg: float = Field(default=0.0, ge=-180.0, le=180.0)
-    capacity_threshold: float = Field(ge=0.0, le=1.0)
+    health_threshold: float = Field(ge=0.0, le=1.0)
 
 class MQTTConfig(BaseModel):
     broker: str = "localhost"
@@ -138,8 +144,9 @@ class RobotConfig(BaseModel):
     measurement_devices: List[DeviceConfig]
     feeder: FeederConfig
     defeeder: DefeederConfig
+    feeder_magazines: List[MagazineConfig] = Field(default_factory=list)
+    defeeder_magazines: List[DefeederMagazineConfig] = Field(default_factory=list)
     mag_distributor: MagDistributorConfig
-    dropoff_grades: List[DropoffGradeConfig]
     mqtt: MQTTConfig
     grbl: GRBLConfig
     vision: VisionConfig
@@ -150,18 +157,19 @@ class RobotConfig(BaseModel):
     logging: LoggingConfig
 
     @model_validator(mode='after')
-    def validate_dropoff_grades(self) -> 'RobotConfig':
+    def validate_defeeder_magazines(self) -> 'RobotConfig':
         # Sort grades by threshold in descending order for consistent behavior
-        sorted_grades = sorted(self.dropoff_grades, key=lambda x: x.capacity_threshold, reverse=True)
-        for i in range(1, len(sorted_grades)):
-            if sorted_grades[i-1].capacity_threshold <= sorted_grades[i].capacity_threshold:
+
+        sorted_defeeding_mags = sorted(self.defeeder_magazines, key=lambda x: x.health_threshold, reverse=True)
+        for i in range(1, len(sorted_defeeding_mags)):
+            if sorted_defeeding_mags[i-1].health_threshold <= sorted_defeeding_mags[i].health_threshold:
                 raise ValueError(
                     f"Dropoff grade thresholds must be in strictly descending order. Found: "
-                    f"Grade {sorted_grades[i-1].id} ({sorted_grades[i-1].capacity_threshold}) <= "
-                    f"Grade {sorted_grades[i].id} ({sorted_grades[i].capacity_threshold})"
+                    f"Grade {sorted_defeeding_mags[i-1].name} ({sorted_defeeding_mags[i-1].health_threshold}) <= "
+                    f"Grade {sorted_defeeding_mags[i].name} ({sorted_defeeding_mags[i].health_threshold})"
                 )
         # Store sorted grades to ensure consistent behavior
-        self.dropoff_grades = sorted_grades
+        self.defeeder_magazines = sorted_defeeding_mags
         return self
 
 class ConfigParser:
@@ -181,7 +189,11 @@ class ConfigParser:
                 device_id = device['id']
                 for slot in device['slots']:
                     slot['device_id'] = device_id
-                    
+
+            # Set default values for feeder and defeeder magazines
+            config_dict['feeder_magazines'] = config_dict.get('feeder_magazines', [])
+            config_dict['defeeder_magazines'] = config_dict.get('defeeder_magazines', [])
+
             return RobotConfig(**config_dict)
 
     @property