| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from dataclasses import dataclass
- from enum import Enum
- from typing import List, Tuple
- from utils.config import ConfigParser, SlotConfig
- from robot.movement import RobotMovement
- from utils.logging import LoggerSingleton
- from api.mqtt_handler import MQTTHandler, Device
- from api.grbl_handler import GRBLHandler
- logger = LoggerSingleton.get_logger()
- class CellStatus(Enum):
- WAITING = "waiting"
- MEASURING = "measuring"
- COMPLETED = "completed"
- FAILED = "failed"
- @dataclass
- class Cell:
- id: str
- status: CellStatus = CellStatus.WAITING
- measurement_slot: int = None
- capacity: float = None
- class DropoffGrade:
- def __init__(self, name: str, x: float, y: float, z: float):
- self.name = name
- self.position = (x, y, z)
- class RobotController:
- def __init__(self, config: ConfigParser):
- self.config = config
- self.devices = self.config.get_devices()
- self.feeder = self.config.get_feeder()
- self.dropoff_grades = self.config.get_dropoff_grades()
- self.rejected_grade = self.dropoff_grades['rejected']
- self.system_settings = self.config.get_system_settings()
-
- # Initialize robot movement
- grbl_config = self.config.get_grbl_config()
- self.grbl_handler = GRBLHandler(grbl_config.get('port'), grbl_config.get('baudrate'))
- self.movement = RobotMovement(self.grbl_handler)
-
- # Initialize with configured values
- self.total_slots = sum(len(device.slots) for device in self.devices)
- self.work_queue: List[SlotConfig] = []
- self.gripper_occupied = False
-
- # Initialize MQTT handler
- mqtt_config = self.config.get_mqtt_config()
- self.mqtt_handler = MQTTHandler(
- broker=mqtt_config.get('broker'),
- port=mqtt_config.get('port'),
- username=mqtt_config.get('username'),
- password=mqtt_config.get('password'),
- )
-
- # Register all devices with MQTT handler
- for device in self.devices:
- self.mqtt_handler.register_device(
- Device(device_id=device.id, num_slots=len(device.slots))
- )
- async def connect(self):
- await self.grbl_handler.connect()
- await self.movement.set_speed(self.system_settings.speed)
- async def cleanup(self):
- """Cleanup resources on shutdown"""
- # await self.movement.cleanup() TODO[SG]: Implement cleanup method in movement.py
- self.mqtt_handler.cleanup()
- async def pick_cell_from_feeder(self):
- if self.gripper_occupied:
- logger.error("Gripper already occupied")
- return False
-
- logger.info("Move to save position and pick cell from feeder...")
- feeder_pos = self.feeder.approach_position
- x, y, z = feeder_pos
- # Move to safe height first
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- # Move to pickup position
- await self.movement.move_to_position(x, y, z)
- # Grip cell
- try:
- await self.movement.activate_endeffector()
- self.gripper_occupied = True
- # Move back to safe height
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- logger.info("Cell picked from feeder")
- return True
- except Exception as e:
- logger.error(f"Failed to pick cell from feeder: {str(e)}")
- await self.movement.deactivate_endeffector() # TODO [SG]: Should deactivate here?
- self.gripper_occupied = False
- raise e
- def get_next_free_slot(self):
- for device in self.devices:
- for slot in device.slots:
- if not slot.occupied:
- logger.info(f"Next free slot is {slot}")
- return slot
-
- logger.error("No free slots available")
- return None
- async def insert_cell_to_next_available(self, cell: Cell):
- if not self.gripper_occupied:
- logger.error("Gripper not occupied")
- return
- slot = self.get_next_free_slot()
- if not slot:
- return
- await self.insert_cell_to_slot(cell, slot)
- async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig):
- logger.info(f"Inserting cell {cell.id} to {slot}...")
- if slot.occupied:
- raise RuntimeError(f"{slot} is already occupied")
- if not self.gripper_occupied:
- raise RuntimeError("Gripper not occupied")
- # Move to slot position
- x, y, z = slot.position
- # Move to safe height first
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- # Move down to insertion position
- await self.movement.move_to_position(x, y, z)
- # Release cell
- if not await self.movement.deactivate_endeffector():
- raise RuntimeError(f"Failed to release cell {cell.id}")
-
- slot.occupied = True
- slot.cell_id = cell.id
- self.gripper_occupied = False
- # Start measurement
- cell.status = CellStatus.MEASURING
- # cell.measurement_slot = slot.slot_id # TODO[SG]: Make this consistent
- self.mqtt_handler.start_measurement(
- device_id=slot.device_id,
- slot=slot.slot_id,
- cell_id=cell.id,
- callback=lambda device_id, slot, cell_id, capacity: self.work_queue.append((cell_id, slot, device_id, capacity))
- )
-
- # Move back to safe height
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- logger.info(f"Cell {cell.id} inserted to {slot}")
- return True
- async def collect_cell_from_slot(self, slot: SlotConfig):
- if self.gripper_occupied:
- logger.error("Gripper already occupied")
- return None
-
- x, y, z = slot.position
- # Move to safe height first
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- # Move down to collection position
- await self.movement.move_to_position(x, y, z)
- # Grip cell
- try:
- if not await self.movement.activate_endeffector():
- raise RuntimeError("Failed to grip cell")
- self.gripper_occupied = True
- cell_id = slot.cell_id
- slot.occupied = False
- slot.cell_id = None
-
- # Move back to safe height
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- logger.info(f"Cell {cell_id} collected from {slot}")
- return cell_id
- except Exception as e:
- await self.movement.deactivate_endeffector() # Try to deactivate to avoid stuck gripper
- self.gripper_occupied = False
- raise e
- async def sort_cell(self, cell: Cell):
- if not self.gripper_occupied:
- logger.error("Gripper not occupied")
- return False
- if cell.status == CellStatus.FAILED:
- await self.dropoff_cell(self.rejected_grade)
- logger.info(f"Cell {cell.id} sorted to rejected grade")
- return True
- for name, grade in self.dropoff_grades.items():
- if cell.capacity >= grade.capacity_threshold:
- await self.dropoff_cell(grade)
- self.gripper_occupied = False
- logger.info(f"Cell {cell.id} sorted to grade {name}")
- return True
- logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}")
- return False
- async def dropoff_cell(self, dropoff_grade: DropoffGrade):
- if not self.gripper_occupied:
- logger.error("Cannot drop off: gripper not occupied")
- return
-
- x, y, z = dropoff_grade.position
- # Move to safe height first
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- # Move to dropoff position
- await self.movement.move_to_position(x, y, z)
- # Release cell
- if not await self.movement.deactivate_endeffector():
- raise RuntimeError("Failed to release cell")
- self.gripper_occupied = False
- # Move back to safe height
- await self.movement.move_to_position(x, y, self.system_settings.safe_height)
- logger.info(f"Cell dropped off at grade {dropoff_grade.name}")
|