|
|
@@ -1,7 +1,7 @@
|
|
|
from dataclasses import dataclass
|
|
|
from enum import Enum
|
|
|
from typing import List, Tuple, Optional
|
|
|
-from robot_control.src.utils.config import RobotConfig, SlotConfig, DeviceConfig, DropoffGradeConfig
|
|
|
+from robot_control.src.utils.config import RobotConfig, SlotConfig, DeviceConfig, DefeederMagazineConfig
|
|
|
from robot_control.src.robot.movement import RobotMovement
|
|
|
import logging
|
|
|
from robot_control.src.api.mqtt_handler import MQTTHandler, MeasurementResult
|
|
|
@@ -25,10 +25,10 @@ class CellStatus(Enum):
|
|
|
class Cell:
|
|
|
id: int
|
|
|
status: CellStatus = CellStatus.WAITING
|
|
|
- capacity: float = 0
|
|
|
+ health: float = 0
|
|
|
|
|
|
class RobotController:
|
|
|
- def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface, i2c, vision, pump_controller, feeder_queue: asyncio.Queue[int], defeeder_queue: asyncio.Queue[int]):
|
|
|
+ def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface, i2c, vision, pump_controller, feeder_queue: asyncio.Queue[int], defeeder_queue: asyncio.Queue[DefeederMagazineConfig]):
|
|
|
# Store configuration and hardware interfaces
|
|
|
self.config = config
|
|
|
self.cells: dict[int, Cell] = {}
|
|
|
@@ -189,10 +189,10 @@ class RobotController:
|
|
|
await self.pick_cell_from_slot(waiting_slot)
|
|
|
|
|
|
if not cell: # Cell not found, create new
|
|
|
- cell = Cell(measurement_result.cell_id, cell_status, capacity=measurement_result.capacity)
|
|
|
+ cell = Cell(measurement_result.cell_id, cell_status, health=measurement_result.health)
|
|
|
self.cells[measurement_result.cell_id] = cell
|
|
|
else:
|
|
|
- cell.capacity = measurement_result.capacity
|
|
|
+ cell.health = measurement_result.health
|
|
|
cell.status = cell_status
|
|
|
|
|
|
await self.sort_cell(cell)
|
|
|
@@ -355,16 +355,16 @@ class RobotController:
|
|
|
del self.cells[cell.id] # Remove cell from our database TODO [SG]: Should we keep it for history?
|
|
|
|
|
|
if cell.status is CellStatus.ERROR:
|
|
|
- cell.capacity = 0 # will be dropped off in the lowest grade
|
|
|
- for idx, mag in enumerate(self.defeeder_magazines):
|
|
|
- if cell.capacity >= mag.health_threshold:
|
|
|
- await self.dropoff_cell(idx)
|
|
|
+ cell.health = 0 # will be dropped off in the lowest grade
|
|
|
+ for mag in self.defeeder_magazines:
|
|
|
+ if cell.health >= mag.health_range[0] and cell.health <= mag.health_range[1]:
|
|
|
+ await self.dropoff_cell(mag)
|
|
|
logger.info(f"Cell {cell.id} sorted to magazine {mag.name}")
|
|
|
return True
|
|
|
- logger.error(f"No suitable magazine found for cell {cell.id} with capacity {cell.capacity}")
|
|
|
+ logger.error(f"No suitable magazine found for cell {cell.id} with capacity {cell.health}")
|
|
|
return False
|
|
|
|
|
|
- async def dropoff_cell(self, defeeder_mag_idx: Optional[int] = None):
|
|
|
+ async def dropoff_cell(self, defeeder_mag: Optional[DefeederMagazineConfig] = None):
|
|
|
"""
|
|
|
Drop off a cell into the specified defeeder magazine.
|
|
|
Adds the magazine index to the defeeder queue for async handling.
|
|
|
@@ -374,15 +374,11 @@ class RobotController:
|
|
|
return
|
|
|
|
|
|
# If no defeeddefeeder_mag_idx is given, use the last one in the list
|
|
|
- if defeeder_mag_idx is None:
|
|
|
- if not self.defeeder_magazines:
|
|
|
- logger.error("No dropoff magazines configured")
|
|
|
- return
|
|
|
- defeeder_mag_idx = len(self.defeeder_magazines)-1
|
|
|
- # Add the grade id to defeeder_queue when a cell is dropped off
|
|
|
+ if defeeder_mag is None:
|
|
|
+ defeeder_mag = self.defeeder_magazines[-1]
|
|
|
if self.defeeder_queue is not None:
|
|
|
try:
|
|
|
- await self.defeeder_queue.put(defeeder_mag_idx)
|
|
|
+ await self.defeeder_queue.put(defeeder_mag)
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to put to defeeder_queue: {e}")
|
|
|
|