controller.py 8.3 KB


  1. from dataclasses import dataclass
  2. from enum import Enum
  3. from typing import List, Tuple
  4. from utils.config import ConfigParser, SlotConfig
  5. from robot.movement import RobotMovement
  6. from utils.logging import LoggerSingleton
  7. from api.mqtt_handler import MQTTHandler, Device
  8. from api.grbl_handler import GRBLHandler
  9. logger = LoggerSingleton.get_logger()
  10. class CellStatus(Enum):
  11. WAITING = "waiting"
  12. MEASURING = "measuring"
  13. COMPLETED = "completed"
  14. FAILED = "failed"
  15. @dataclass
  16. class Cell:
  17. id: str
  18. status: CellStatus = CellStatus.WAITING
  19. measurement_slot: int = None
  20. capacity: float = None
  21. class DropoffGrade:
  22. def __init__(self, name: str, x: float, y: float, z: float):
  23. self.name = name
  24. self.position = (x, y, z)
  25. class RobotController:
  26. def __init__(self, config: ConfigParser):
  27. self.config = config
  28. self.devices = self.config.get_devices()
  29. self.feeder = self.config.get_feeder()
  30. self.dropoff_grades = self.config.get_dropoff_grades()
  31. self.rejected_grade = self.dropoff_grades['rejected']
  32. self.system_settings = self.config.get_system_settings()
  33. # Initialize robot movement
  34. grbl_config = self.config.get_grbl_config()
  35. self.grbl_handler = GRBLHandler(grbl_config.get('port'), grbl_config.get('baudrate'))
  36. self.movement = RobotMovement(self.grbl_handler)
  37. # Initialize with configured values
  38. self.total_slots = sum(len(device.slots) for device in self.devices)
  39. self.work_queue: List[SlotConfig] = []
  40. self.gripper_occupied = False
  41. # Initialize MQTT handler
  42. mqtt_config = self.config.get_mqtt_config()
  43. self.mqtt_handler = MQTTHandler(
  44. broker=mqtt_config.get('broker'),
  45. port=mqtt_config.get('port'),
  46. username=mqtt_config.get('username'),
  47. password=mqtt_config.get('password'),
  48. )
  49. # Register all devices with MQTT handler
  50. for device in self.devices:
  51. self.mqtt_handler.register_device(
  52. Device(device_id=device.id, num_slots=len(device.slots))
  53. )
  54. async def connect(self):
  55. await self.grbl_handler.connect()
  56. await self.movement.set_speed(self.system_settings.speed)
  57. async def cleanup(self):
  58. """Cleanup resources on shutdown"""
  59. # await self.movement.cleanup() TODO[SG]: Implement cleanup method in movement.py
  60. self.mqtt_handler.cleanup()
  61. async def pick_cell_from_feeder(self):
  62. if self.gripper_occupied:
  63. logger.error("Gripper already occupied")
  64. return False
  65. logger.info("Move to save position and pick cell from feeder...")
  66. feeder_pos = self.feeder.approach_position
  67. x, y, z = feeder_pos
  68. # Move to safe height first
  69. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  70. # Move to pickup position
  71. await self.movement.move_to_position(x, y, z)
  72. # Grip cell
  73. try:
  74. await self.movement.activate_endeffector()
  75. self.gripper_occupied = True
  76. # Move back to safe height
  77. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  78. logger.info("Cell picked from feeder")
  79. return True
  80. except Exception as e:
  81. logger.error(f"Failed to pick cell from feeder: {str(e)}")
  82. await self.movement.deactivate_endeffector() # TODO [SG]: Should deactivate here?
  83. self.gripper_occupied = False
  84. raise e
  85. def get_next_free_slot(self):
  86. for device in self.devices:
  87. for slot in device.slots:
  88. if not slot.occupied:
  89. logger.info(f"Next free slot is {slot}")
  90. return slot
  91. logger.error("No free slots available")
  92. return None
  93. async def insert_cell_to_next_available(self, cell: Cell):
  94. if not self.gripper_occupied:
  95. logger.error("Gripper not occupied")
  96. return
  97. slot = self.get_next_free_slot()
  98. if not slot:
  99. return
  100. await self.insert_cell_to_slot(cell, slot)
  101. async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig):
  102. logger.info(f"Inserting cell {cell.id} to {slot}...")
  103. if slot.occupied:
  104. raise RuntimeError(f"{slot} is already occupied")
  105. if not self.gripper_occupied:
  106. raise RuntimeError("Gripper not occupied")
  107. # Move to slot position
  108. x, y, z = slot.position
  109. # Move to safe height first
  110. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  111. # Move down to insertion position
  112. await self.movement.move_to_position(x, y, z)
  113. # Release cell
  114. if not await self.movement.deactivate_endeffector():
  115. raise RuntimeError(f"Failed to release cell {cell.id}")
  116. slot.occupied = True
  117. slot.cell_id = cell.id
  118. self.gripper_occupied = False
  119. # Start measurement
  120. cell.status = CellStatus.MEASURING
  121. # cell.measurement_slot = slot.slot_id # TODO[SG]: Make this consistent
  122. self.mqtt_handler.start_measurement(
  123. device_id=slot.device_id,
  124. slot=slot.slot_id,
  125. cell_id=cell.id,
  126. callback=lambda device_id, slot, cell_id, capacity: self.work_queue.append((cell_id, slot, device_id, capacity))
  127. )
  128. # Move back to safe height
  129. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  130. logger.info(f"Cell {cell.id} inserted to {slot}")
  131. return True
  132. async def collect_cell_from_slot(self, slot: SlotConfig):
  133. if self.gripper_occupied:
  134. logger.error("Gripper already occupied")
  135. return None
  136. x, y, z = slot.position
  137. # Move to safe height first
  138. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  139. # Move down to collection position
  140. await self.movement.move_to_position(x, y, z)
  141. # Grip cell
  142. try:
  143. if not await self.movement.activate_endeffector():
  144. raise RuntimeError("Failed to grip cell")
  145. self.gripper_occupied = True
  146. cell_id = slot.cell_id
  147. slot.occupied = False
  148. slot.cell_id = None
  149. # Move back to safe height
  150. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  151. logger.info(f"Cell {cell_id} collected from {slot}")
  152. return cell_id
  153. except Exception as e:
  154. await self.movement.deactivate_endeffector() # Try to deactivate to avoid stuck gripper
  155. self.gripper_occupied = False
  156. raise e
  157. async def sort_cell(self, cell: Cell):
  158. if not self.gripper_occupied:
  159. logger.error("Gripper not occupied")
  160. return False
  161. if cell.status == CellStatus.FAILED:
  162. await self.dropoff_cell(self.rejected_grade)
  163. logger.info(f"Cell {cell.id} sorted to rejected grade")
  164. return True
  165. for name, grade in self.dropoff_grades.items():
  166. if cell.capacity >= grade.capacity_threshold:
  167. await self.dropoff_cell(grade)
  168. self.gripper_occupied = False
  169. logger.info(f"Cell {cell.id} sorted to grade {name}")
  170. return True
  171. logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}")
  172. return False
  173. async def dropoff_cell(self, dropoff_grade: DropoffGrade):
  174. if not self.gripper_occupied:
  175. logger.error("Cannot drop off: gripper not occupied")
  176. return
  177. x, y, z = dropoff_grade.position
  178. # Move to safe height first
  179. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  180. # Move to dropoff position
  181. await self.movement.move_to_position(x, y, z)
  182. # Release cell
  183. if not await self.movement.deactivate_endeffector():
  184. raise RuntimeError("Failed to release cell")
  185. self.gripper_occupied = False
  186. # Move back to safe height
  187. await self.movement.move_to_position(x, y, self.system_settings.safe_height)
  188. logger.info(f"Cell dropped off at grade {dropoff_grade.name}")