import asyncio import serial_asyncio from typing import List from robot_control.src.utils.logging import LoggerSingleton import datetime import collections MOVE_RATE = 50 APPROACHE_RATE = 10 CELL_CIRCLE_RADIUS = 0.35 logger = LoggerSingleton.get_logger() class GRBLHandler: def __init__(self, port: str, baudrate: int): self.port = port self.debug = port == "debug" self.baudrate = baudrate self.controller_active = asyncio.Event() self.reader = None self.writer = None self.position_callbacks = [] async def connect(self): """Connect to GRBL controller""" if self.debug: logger.debug("GRBLHandler is not connected in debug mode") return logger.info("Connecting to GRBL...") try: self.reader, self.writer = await serial_asyncio.open_serial_connection( url = self.port, baudrate = self.baudrate ) except serial_asyncio.serial.SerialException as e: raise serial_asyncio.serial.SerialException(f"Failed to connect to robot: {str(e)}") init_response = [] for _ in range(3): # Flush initial responses init_response.append(await self._process_response()) if not any(isinstance(response, collections.Iterable) and "Grbl" in response for response in init_response): raise RuntimeError("Failed to connect to GRBL") logger.info("GRBL connected.") # Enable homing ($22=1) # Run homing cycle ($H) # Set absolute positioning mode (G90) # Set units to millimeters (G21) # await self.send_gcode(['$22=1', '$H', 'G90', 'G21']) await self.send_gcode(['$22=1', '$X', 'G90', 'G21']) # This skips homing async def send_gcode(self, commands: List[str]): """Send GCODE commands""" if self.debug: logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode") return self.controller_active.set() try: for cmd in commands: logger.debug(f"Sending G-Code command: {cmd}") self.writer.write(f"{cmd}\n".encode()) await self.writer.drain() if not (await self._process_response()): raise RuntimeError("Did not receive response from GRBL") except RuntimeError as e: logger.error(f"Failed to send G-Code commands: {str(e)}") finally: self.controller_active.clear() async def get_status(self): """Get current GRBL status Returns: str: Status response from GRBL, or None if in debug mode """ if self.debug: return None if self.writer: self.writer.write(b"?\n") await self.writer.drain() # '?' command returns status report and 'ok' response = await self._process_response() response = response + await self._process_response() return response return None async def wait_until_idle(self, timeout_s, position: list[float] = None): """Wait until GRBL reports idle status Args: timeout_s: Timeout in seconds position: Optional list of 3 floats that will be updated with current position """ if self.debug: await asyncio.sleep(1) return start = datetime.datetime.now() while True: response = await self.get_status() if response and 'MPos:' in response: # Parse position from status reports () pos_str = response.split('MPos:')[1].split('|')[0] if position is not None: x, y, z = map(float, pos_str.split(',')) position = (x,y,z) if response and "Idle" in response: logger.debug("Movement complete.\nContinuing...") break now = datetime.datetime.now() if (now - start).total_seconds() > timeout_s: logger.error("Waiting on idle took too long!") raise TimeoutError("GRBL did not report idle status") await asyncio.sleep(0.2) # Async delay to prevent flooding async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: list[float] = None): """Send GCODE commands and wait until machine is idle""" await self.send_gcode(commands) await asyncio.sleep(0.2) # Delay to allow GRBL to process commands await self.wait_until_idle(timeout_s, position) async def _process_response(self): """Process GRBL responses""" if self.reader: try: response = await asyncio.wait_for(self.reader.readuntil(), timeout=4.0) # 2 second timeout decoded = response.strip().decode("utf-8") logger.debug(f"G-Code response: {decoded}") # Parse position from status reports () if decoded.startswith('<') and 'MPos:' in decoded: pos_str = decoded.split('MPos:')[1].split('|')[0] x, y, z = map(float, pos_str.split(',')) self.current_position = (x,y,z) # Notify all registered callbacks for callback in self.position_callbacks: callback(self.current_position) return decoded except asyncio.TimeoutError as e: logger.warning("Timeout waiting for GRBL response") except Exception as e: raise RuntimeError(f"Failed to process response: {e}") def register_position_callback(self, callback): """Register callback for position updates Args: callback: Function taking tuple with X,Y,Z coordinates """ self.position_callbacks.append(callback) async def close(self): """Close connection""" if self.writer: self.writer.close() await self.writer.wait_closed()