from dataclasses import dataclass from enum import Enum from typing import List, Tuple from utils.config import ConfigParser, SlotConfig from robot.movement import RobotMovement from utils.logging import LoggerSingleton from api.mqtt_handler import MQTTHandler, Device from api.grbl_handler import GRBLHandler logger = LoggerSingleton.get_logger() class CellStatus(Enum): WAITING = "waiting" MEASURING = "measuring" COMPLETED = "completed" FAILED = "failed" @dataclass class Cell: id: str status: CellStatus = CellStatus.WAITING measurement_slot: int = None capacity: float = None class DropoffGrade: def __init__(self, name: str, x: float, y: float, z: float): self.name = name self.position = (x, y, z) class RobotController: def __init__(self, config: ConfigParser): self.config = config self.devices = self.config.get_devices() self.feeder = self.config.get_feeder() self.dropoff_grades = self.config.get_dropoff_grades() self.rejected_grade = self.dropoff_grades['rejected'] self.system_settings = self.config.get_system_settings() # Initialize robot movement grbl_config = self.config.get_grbl_config() self.grbl_handler = GRBLHandler(grbl_config.get('port'), grbl_config.get('baudrate')) self.movement = RobotMovement(self.grbl_handler) # Initialize with configured values self.total_slots = sum(len(device.slots) for device in self.devices) self.work_queue: List[SlotConfig] = [] self.gripper_occupied = False # Initialize MQTT handler mqtt_config = self.config.get_mqtt_config() self.mqtt_handler = MQTTHandler( broker=mqtt_config.get('broker'), port=mqtt_config.get('port'), username=mqtt_config.get('username'), password=mqtt_config.get('password'), ) # Register all devices with MQTT handler for device in self.devices: self.mqtt_handler.register_device( Device(device_id=device.id, num_slots=len(device.slots)) ) async def connect(self): await self.grbl_handler.connect() await self.movement.set_speed(self.system_settings.speed) async def cleanup(self): """Cleanup resources on shutdown""" # await self.movement.cleanup() TODO[SG]: Implement cleanup method in movement.py self.mqtt_handler.cleanup() async def pick_cell_from_feeder(self): if self.gripper_occupied: logger.error("Gripper already occupied") return False logger.info("Move to save position and pick cell from feeder...") feeder_pos = self.feeder.approach_position x, y, z = feeder_pos # Move to safe height first await self.movement.move_to_position(x, y, self.system_settings.safe_height) # Move to pickup position await self.movement.move_to_position(x, y, z) # Grip cell try: await self.movement.activate_endeffector() self.gripper_occupied = True # Move back to safe height await self.movement.move_to_position(x, y, self.system_settings.safe_height) logger.info("Cell picked from feeder") return True except Exception as e: logger.error(f"Failed to pick cell from feeder: {str(e)}") await self.movement.deactivate_endeffector() # TODO [SG]: Should deactivate here? self.gripper_occupied = False raise e def get_next_free_slot(self): for device in self.devices: for slot in device.slots: if not slot.occupied: logger.info(f"Next free slot is {slot}") return slot logger.error("No free slots available") return None async def insert_cell_to_next_available(self, cell: Cell): if not self.gripper_occupied: logger.error("Gripper not occupied") return slot = self.get_next_free_slot() if not slot: return await self.insert_cell_to_slot(cell, slot) async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig): logger.info(f"Inserting cell {cell.id} to {slot}...") if slot.occupied: raise RuntimeError(f"{slot} is already occupied") if not self.gripper_occupied: raise RuntimeError("Gripper not occupied") # Move to slot position x, y, z = slot.position # Move to safe height first await self.movement.move_to_position(x, y, self.system_settings.safe_height) # Move down to insertion position await self.movement.move_to_position(x, y, z) # Release cell if not await self.movement.deactivate_endeffector(): raise RuntimeError(f"Failed to release cell {cell.id}") slot.occupied = True slot.cell_id = cell.id self.gripper_occupied = False # Start measurement cell.status = CellStatus.MEASURING # cell.measurement_slot = slot.slot_id # TODO[SG]: Make this consistent self.mqtt_handler.start_measurement( device_id=slot.device_id, slot=slot.slot_id, cell_id=cell.id, callback=lambda device_id, slot, cell_id, capacity: self.work_queue.append((cell_id, slot, device_id, capacity)) ) # Move back to safe height await self.movement.move_to_position(x, y, self.system_settings.safe_height) logger.info(f"Cell {cell.id} inserted to {slot}") return True async def collect_cell_from_slot(self, slot: SlotConfig): if self.gripper_occupied: logger.error("Gripper already occupied") return None x, y, z = slot.position # Move to safe height first await self.movement.move_to_position(x, y, self.system_settings.safe_height) # Move down to collection position await self.movement.move_to_position(x, y, z) # Grip cell try: if not await self.movement.activate_endeffector(): raise RuntimeError("Failed to grip cell") self.gripper_occupied = True cell_id = slot.cell_id slot.occupied = False slot.cell_id = None # Move back to safe height await self.movement.move_to_position(x, y, self.system_settings.safe_height) logger.info(f"Cell {cell_id} collected from {slot}") return cell_id except Exception as e: await self.movement.deactivate_endeffector() # Try to deactivate to avoid stuck gripper self.gripper_occupied = False raise e async def sort_cell(self, cell: Cell): if not self.gripper_occupied: logger.error("Gripper not occupied") return False if cell.status == CellStatus.FAILED: await self.dropoff_cell(self.rejected_grade) logger.info(f"Cell {cell.id} sorted to rejected grade") return True for name, grade in self.dropoff_grades.items(): if cell.capacity >= grade.capacity_threshold: await self.dropoff_cell(grade) self.gripper_occupied = False logger.info(f"Cell {cell.id} sorted to grade {name}") return True logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}") return False async def dropoff_cell(self, dropoff_grade: DropoffGrade): if not self.gripper_occupied: logger.error("Cannot drop off: gripper not occupied") return x, y, z = dropoff_grade.position # Move to safe height first await self.movement.move_to_position(x, y, self.system_settings.safe_height) # Move to dropoff position await self.movement.move_to_position(x, y, z) # Release cell if not await self.movement.deactivate_endeffector(): raise RuntimeError("Failed to release cell") self.gripper_occupied = False # Move back to safe height await self.movement.move_to_position(x, y, self.system_settings.safe_height) logger.info(f"Cell dropped off at grade {dropoff_grade.name}")