|
|
@@ -29,6 +29,7 @@ class Cell:
|
|
|
|
|
|
class RobotController:
|
|
|
def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface, i2c, vision, pump_controller, feeder_queue: asyncio.Queue[int], defeeder_queue: asyncio.Queue[int]):
|
|
|
+ # Store configuration and hardware interfaces
|
|
|
self.config = config
|
|
|
self.cells: dict[int, Cell] = {}
|
|
|
self.devices = self.config.measurement_devices
|
|
|
@@ -42,7 +43,7 @@ class RobotController:
|
|
|
self.feeder_queue = feeder_queue
|
|
|
self.defeeder_queue = defeeder_queue
|
|
|
|
|
|
- # Initialize robot movement
|
|
|
+ # Initialize robot movement and GRBL handler
|
|
|
self.grbl_handler = GRBLHandler(
|
|
|
self.config.grbl.port,
|
|
|
self.config.grbl.baudrate
|
|
|
@@ -51,14 +52,14 @@ class RobotController:
|
|
|
|
|
|
self.valve_pin = self.config.gpio.valve_pin
|
|
|
|
|
|
- # Initialize with configured values
|
|
|
+ # Calculate total slots and initialize work queue
|
|
|
self.total_slots = sum(len(device.slots) for device in self.devices)
|
|
|
self.work_queue: List[MeasurementResult] = []
|
|
|
|
|
|
self.gripper_occupied = False
|
|
|
self.suction_state = False
|
|
|
|
|
|
- # Initialize MQTT handler
|
|
|
+ # Initialize MQTT handler for measurement devices
|
|
|
mqtt = self.config.mqtt
|
|
|
self.mqtt_handler = MQTTHandler(
|
|
|
broker=mqtt.broker,
|
|
|
@@ -76,6 +77,7 @@ class RobotController:
|
|
|
)
|
|
|
|
|
|
async def perform_homing(self):
|
|
|
+ """Perform homing sequence for robot axes."""
|
|
|
await self.movement.perform_homing()
|
|
|
|
|
|
def activate_endeffector(self) -> bool:
|
|
|
@@ -99,9 +101,14 @@ class RobotController:
|
|
|
raise RuntimeError(f"Failed to deactivate end effector: {str(e)}")
|
|
|
|
|
|
def set_suction_state(self, state:bool):
|
|
|
+ """Set the current suction state (used for feedback from sensors)."""
|
|
|
self.suction_state = state
|
|
|
|
|
|
async def connect(self):
|
|
|
+ """
|
|
|
+ Connect to GRBL controller and set movement speed.
|
|
|
+ Handles reset attempts if connection fails.
|
|
|
+ """
|
|
|
try:
|
|
|
await self.grbl_handler.connect()
|
|
|
except RuntimeError as e:
|
|
|
@@ -111,21 +118,22 @@ class RobotController:
|
|
|
except:
|
|
|
await self.grbl_handler.reset_usb()
|
|
|
|
|
|
- await self.movement.set_speed(self.config.movement.speed)
|
|
|
-
|
|
|
async def cleanup(self):
|
|
|
- """Cleanup resources on shutdown"""
|
|
|
+ """Cleanup resources on shutdown (GRBL, MQTT)."""
|
|
|
await self.grbl_handler.close()
|
|
|
self.mqtt_handler.cleanup()
|
|
|
|
|
|
def add_cell(self, cell_id: int) -> Cell:
|
|
|
+ """Add a new cell to the internal cell dictionary."""
|
|
|
self.cells[cell_id] = Cell(cell_id)
|
|
|
return self.cells[cell_id]
|
|
|
|
|
|
def get_cell_by_id(self, cell_id: int) -> Optional[Cell]:
|
|
|
+ """Retrieve a cell by its ID."""
|
|
|
return self.cells.get(cell_id, None)
|
|
|
|
|
|
def get_device_by_id(self, device_id: str) -> Optional[DeviceConfig]:
|
|
|
+ """Retrieve a measurement device by its ID."""
|
|
|
for device in self.devices:
|
|
|
if device.id == device_id:
|
|
|
return device
|
|
|
@@ -133,6 +141,7 @@ class RobotController:
|
|
|
return None
|
|
|
|
|
|
def get_slot_by_id(self, device_id: str, slot_id: int) -> Optional[SlotConfig]:
|
|
|
+ """Retrieve a slot by device and slot ID."""
|
|
|
try:
|
|
|
device = self.get_device_by_id(device_id)
|
|
|
if not device:
|
|
|
@@ -143,6 +152,10 @@ class RobotController:
|
|
|
return None
|
|
|
|
|
|
def get_next_free_slot(self) -> Optional[SlotConfig]:
|
|
|
+ """
|
|
|
+ Find and return the next available (unoccupied) slot.
|
|
|
+ Returns None if all slots are occupied.
|
|
|
+ """
|
|
|
for device in self.devices:
|
|
|
for slot in device.slots:
|
|
|
if not slot.occupied:
|
|
|
@@ -151,8 +164,12 @@ class RobotController:
|
|
|
|
|
|
logger.warning("No free slots available")
|
|
|
return None
|
|
|
-
|
|
|
+
|
|
|
async def process_finished_measurement(self):
|
|
|
+ """
|
|
|
+ Process the next finished measurement from the work queue.
|
|
|
+ Picks the cell from the slot and sorts it based on result.
|
|
|
+ """
|
|
|
if not self.work_queue:
|
|
|
logger.info("No finished measurements in queue")
|
|
|
return
|
|
|
@@ -171,7 +188,6 @@ class RobotController:
|
|
|
return
|
|
|
await self.pick_cell_from_slot(waiting_slot)
|
|
|
|
|
|
-
|
|
|
if not cell: # Cell not found, create new
|
|
|
cell = Cell(measurement_result.cell_id, cell_status, capacity=measurement_result.capacity)
|
|
|
self.cells[measurement_result.cell_id] = cell
|
|
|
@@ -182,6 +198,10 @@ class RobotController:
|
|
|
await self.sort_cell(cell)
|
|
|
|
|
|
async def pick_cell_from_feeder(self):
|
|
|
+ """
|
|
|
+ Pick a cell from the feeder and update gripper state.
|
|
|
+ Removes one item from the feeder queue.
|
|
|
+ """
|
|
|
if self.gripper_occupied:
|
|
|
logger.error("Gripper already occupied")
|
|
|
return False
|
|
|
@@ -225,6 +245,9 @@ class RobotController:
|
|
|
raise e
|
|
|
|
|
|
async def insert_cell_to_next_available(self, cell: Cell):
|
|
|
+ """
|
|
|
+ Insert a cell into the next available slot.
|
|
|
+ """
|
|
|
if not self.gripper_occupied:
|
|
|
logger.error("Gripper not occupied")
|
|
|
return
|
|
|
@@ -234,6 +257,9 @@ class RobotController:
|
|
|
await self.insert_cell_to_slot(cell, slot)
|
|
|
|
|
|
async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig):
|
|
|
+ """
|
|
|
+ Insert a cell into a specific slot and start measurement.
|
|
|
+ """
|
|
|
logger.info(f"Inserting cell {cell.id} to {slot}...")
|
|
|
if slot.occupied:
|
|
|
raise RuntimeError(f"{slot} is already occupied")
|
|
|
@@ -246,7 +272,8 @@ class RobotController:
|
|
|
slot_device = self.get_device_by_id(slot.device_id)
|
|
|
if not slot_device:
|
|
|
raise RuntimeError(f"Device {slot.device_id} not found!")
|
|
|
- pos = [sum(el) for el in zip(slot.position, slot_device.position)]
|
|
|
+ pos = tuple(a + b for a, b in zip(slot.position, slot_device.position))
|
|
|
+ assert(len(pos) == 3), "Position must be a 3-tuple (x, y, z)"
|
|
|
safe_pos = (pos[0], pos[1], self.config.movement.safe_height)
|
|
|
logger.info(f"Moving to slot position (safe) {safe_pos}...")
|
|
|
await self.movement.move_to_position(*safe_pos)
|
|
|
@@ -275,6 +302,9 @@ class RobotController:
|
|
|
return True
|
|
|
|
|
|
async def pick_cell_from_slot(self, slot: SlotConfig):
|
|
|
+ """
|
|
|
+ Pick a cell from a given slot and update gripper state.
|
|
|
+ """
|
|
|
if self.gripper_occupied:
|
|
|
logger.error("Gripper already occupied")
|
|
|
return None
|
|
|
@@ -284,7 +314,8 @@ class RobotController:
|
|
|
slot_device = self.get_device_by_id(slot.device_id)
|
|
|
if not slot_device:
|
|
|
raise RuntimeError(f"Device {slot.device_id} not found")
|
|
|
- pos = [sum(el) for el in zip(slot.position, slot_device.position)]
|
|
|
+ pos = tuple(a + b for a, b in zip(slot.position, slot_device.position))
|
|
|
+ assert(len(pos) == 3), "Position must be a 3-tuple (x, y, z)"
|
|
|
safe_pos = (pos[0], pos[1], self.config.movement.safe_height)
|
|
|
logger.info(f"Moving to slot position (safe) {safe_pos}...")
|
|
|
await self.movement.move_to_position(*safe_pos)
|
|
|
@@ -313,6 +344,9 @@ class RobotController:
|
|
|
raise e
|
|
|
|
|
|
async def sort_cell(self, cell: Cell):
|
|
|
+ """
|
|
|
+ Sort a cell into the appropriate magazine based on its capacity/grade.
|
|
|
+ """
|
|
|
if not self.gripper_occupied:
|
|
|
logger.error("Gripper not occupied")
|
|
|
return False
|
|
|
@@ -331,6 +365,10 @@ class RobotController:
|
|
|
return False
|
|
|
|
|
|
async def dropoff_cell(self, defeeder_mag_idx: Optional[int] = None):
|
|
|
+ """
|
|
|
+ Drop off a cell into the specified defeeder magazine.
|
|
|
+ Adds the magazine index to the defeeder queue for async handling.
|
|
|
+ """
|
|
|
if not self.gripper_occupied:
|
|
|
logger.error("Cannot drop off: gripper not occupied")
|
|
|
return
|
|
|
@@ -365,6 +403,10 @@ class RobotController:
|
|
|
logger.info(f"Cell dropped off at defeeder")
|
|
|
|
|
|
def check_cell_voltage(self, voltage, cell_id_str = ""):
|
|
|
+ """
|
|
|
+ Check if the cell voltage is within acceptable range.
|
|
|
+ Returns True if voltage is good, False otherwise.
|
|
|
+ """
|
|
|
if voltage < abs(self.config.feeder.min_voltage):
|
|
|
logger.info(f"Cell {cell_id_str} voltage too low, discarding cell")
|
|
|
return False
|