import asyncio import serial_asyncio from typing import List, Optional import logging import datetime import collections import subprocess import os logger = logging.getLogger(__name__) HOMING_TIMEOUT_S = 80 class GRBLHandler: def __init__(self, port: str, baudrate: int): self.port = port self.debug = port == "debug" self.baudrate = baudrate self.controller_active = asyncio.Event() self.reader : Optional[asyncio.StreamReader] = None self.writer : Optional[asyncio.StreamWriter] = None self.position_callbacks = [] async def connect(self): """Connect to GRBL controller""" if self.debug: logger.debug("GRBLHandler is not connected in debug mode") return logger.info(f"Connecting to GRBL ({self.port}, {self.baudrate})...") try: self.reader, self.writer = await serial_asyncio.open_serial_connection( url = self.port, baudrate = self.baudrate ) except serial_asyncio.serial.SerialException as e: raise RuntimeError(f"Failed to connect to robot: {str(e)}") init_response = await self._process_response() if not any(isinstance(response, str) and "Grbl" in response for response in init_response): raise RuntimeError("Failed to connect to GRBL") logger.info("GRBL connected.") #################### # Set GRBL settings: #################### config_path = "robot_control/config/grbl_config.txt" if os.path.exists(config_path): logger.info(f"Loading GRBL config from {config_path}") with open(config_path, "r") as f: config_lines = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] if config_lines: await self.send_gcode(config_lines) else: logger.warning(f"GRBL config file not found: {config_path}") # Run homing cycle ($H) # Set absolute positioning mode (G90) # Set units to millimeters (G21) await self.send_gcode(['$H'], HOMING_TIMEOUT_S) await self.send_gcode(['G90', 'G21']) # if current pos not 0 set working pos to current pos # (should not be done every time to save eeprom writes) status_response = await self.get_status() current_pos = (0.0, 0.0, 0.0) if status_response and 'WPos:' in status_response: pos_str = status_response.split('WPos:')[1].split('|')[0] x, y, z = map(float, pos_str.split(',')) current_pos = (x, y, z) # If not at (0,0,0), set working position to current position if any(abs(coord) > 1e-6 for coord in current_pos): await self.send_gcode([f'G10 L20 P1 X0 Y0 Z0']) logger.info(f"Set working position to current position: {current_pos}") async def reset(self): """Reset GRBL controller""" if self.debug: logger.debug("GRBLHandler is not reset in debug mode") return if self.writer: self.writer.write(b"\x18") # Soft-reset GRBL await self.writer.drain() await asyncio.sleep(0.5) # Wait for GRBL to reset await self.close() await self.connect() # Reconnect after reset async def reset_usb(self): """ Power cycle the USB port using uhubctl. Requires uhubctl to be installed and accessible. """ if self.debug: logger.debug("USB reset skipped in debug mode") return cmd = ["uhubctl", "-l", "1-1", "-a", "cycle", "-d", "2"] logger.info(f"Resetting USB with: {' '.join(cmd)}") await asyncio.sleep(0.5) # Wait for GRBL to reset await self.connect() # Reconnect after reset try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"USB reset output: {result.stdout.strip()}") except subprocess.CalledProcessError as e: logger.error(f"Failed to reset USB: {e.stderr.strip()}") raise RuntimeError(f"USB reset failed: {e.stderr.strip()}") async def send_gcode(self, commands: List[str], timeout_s = 6): """Send GCODE commands""" if self.debug: logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode") return if not self.writer: logger.error("Writer is not initialized. Cannot send G-Code commands.") return self.controller_active.set() try: for cmd in commands: logger.debug(f"Sending G-Code command: {cmd}") self.writer.write(f"{cmd}\n".encode()) await self.writer.drain() if not (await self._process_response(timeout_s)): raise RuntimeError("Did not receive response from GRBL") except RuntimeError as e: logger.error(f"Failed to send G-Code commands: {str(e)}") finally: self.controller_active.clear() async def get_status(self): """Get current GRBL status Returns: str: Status response from GRBL, or None if in debug mode """ if self.debug: return None if not self.writer: return None response_lines = None try: self.writer.write(b"?") # '?' command returns status report await self.writer.drain() response_lines = str(await self._process_response()) except Exception as e: logger.error(f"Failed to get status: {str(e)}") return None if not response_lines: return None for line in response_lines: if not 'ALARM' in line: continue if ":1" in line: raise RuntimeError("Hard Limit was triggered!") raise RuntimeError(f"Grbl threw ALARM: {line}") return response_lines async def wait_until_idle(self, timeout_s, position: Optional[tuple[float,float,float]] = None): """Wait until GRBL reports idle status Args: timeout_s: Timeout in seconds position: Optional list of 3 floats that will be updated with current position """ if self.debug: await asyncio.sleep(1) return start = datetime.datetime.now() # response = await self.get_status() # First response can still be idle while True: response = await self.get_status() if response and 'WPos:' in response: # Parse position from status reports () pos_str = response.split('WPos:')[1].split('|')[0] if position is not None: x, y, z = map(float, pos_str.split(',')) position = (x,y,z) if response and "Idle" in response: logger.debug("Movement complete.\nContinuing...") break now = datetime.datetime.now() if (now - start).total_seconds() > timeout_s: logger.error("Waiting on idle took too long!") raise TimeoutError("GRBL did not report idle status") await asyncio.sleep(0.5) # Async delay to prevent flooding async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: Optional[tuple[float,float,float]] = None): """Send GCODE commands and wait until machine is idle""" await self.send_gcode(commands) await asyncio.sleep(0.2) # Delay to allow GRBL to process commands await self.wait_until_idle(timeout_s, position) def _check_pos_callbacks(self, line): # Parse position from status reports () if line.startswith('<') and 'WPos:' in line: pos_str = line.split('WPos:')[1].split('|')[0] x, y, z = map(float, pos_str.split(',')) self.current_position = (x,y,z) # Notify all registered callbacks for callback in self.position_callbacks: callback(self.current_position) async def _process_response(self, timeout_s=6.0) -> list[str]: """Process GRBL responses""" response_lines : list[str] = [] if not self.reader: return response_lines first = True while True: try: current_timeout = timeout_s if first else 0.1 line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout) decoded = line.strip().decode("utf-8") self._check_pos_callbacks(decoded) logger.debug(f"G-Code response: {decoded}") response_lines.append(decoded) first = False except asyncio.TimeoutError: # No more data available right now break except Exception as e: raise RuntimeError(f"Failed to process response: {e}") if not response_lines: logger.warning(f"No GRBL response received! ({timeout_s}s)") return response_lines def register_position_callback(self, callback): """Register callback for position updates Args: callback: Function taking tuple with X,Y,Z coordinates """ self.position_callbacks.append(callback) async def close(self): """Close connection""" if self.writer: self.writer.close() await self.writer.wait_closed() self.reader = None self.writer = None