| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import asyncio
- import serial_asyncio
- from typing import List, Optional
- import logging
- import datetime
- import collections
- import subprocess
- import os
- logger = logging.getLogger(__name__)
- HOMING_TIMEOUT_S = 80
- class GRBLHandler:
- def __init__(self, port: str, baudrate: int):
- self.port = port
- self.debug = port == "debug"
- self.baudrate = baudrate
-
- self.controller_active = asyncio.Event()
-
- self.reader : Optional[asyncio.StreamReader] = None
- self.writer : Optional[asyncio.StreamWriter] = None
- self.position_callbacks = []
- async def connect(self):
- """Connect to GRBL controller"""
- if self.debug:
- logger.debug("GRBLHandler is not connected in debug mode")
- return
- logger.info(f"Connecting to GRBL ({self.port}, {self.baudrate})...")
- try:
- self.reader, self.writer = await serial_asyncio.open_serial_connection(
- url = self.port,
- baudrate = self.baudrate
- )
- except serial_asyncio.serial.SerialException as e:
- raise RuntimeError(f"Failed to connect to robot: {str(e)}")
- init_response = await self._process_response()
- if not any(isinstance(response, str) and "Grbl" in response for response in init_response):
- raise RuntimeError("Failed to connect to GRBL")
- logger.info("GRBL connected.")
- ####################
- # Set GRBL settings:
- ####################
-
- config_path = "robot_control/config/grbl_config.txt"
- if os.path.exists(config_path):
- logger.info(f"Loading GRBL config from {config_path}")
- with open(config_path, "r") as f:
- config_lines = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
- if config_lines:
- await self.send_gcode(config_lines)
- else:
- logger.warning(f"GRBL config file not found: {config_path}")
- # Run homing cycle ($H)
- # Set absolute positioning mode (G90)
- # Set units to millimeters (G21)
- await self.send_gcode(['$H'], HOMING_TIMEOUT_S)
- await self.send_gcode(['G90', 'G21'])
- # if current pos not 0 set working pos to current pos
- # (should not be done every time to save eeprom writes)
- status_response = await self.get_status()
- current_pos = (0.0, 0.0, 0.0)
- if status_response and 'WPos:' in status_response:
- pos_str = status_response.split('WPos:')[1].split('|')[0]
- x, y, z = map(float, pos_str.split(','))
- current_pos = (x, y, z)
- # If not at (0,0,0), set working position to current position
- if any(abs(coord) > 1e-6 for coord in current_pos):
- await self.send_gcode([f'G10 L20 P1 X0 Y0 Z0'])
- logger.info(f"Set working position to current position: {current_pos}")
- async def reset(self):
- """Reset GRBL controller"""
- if self.debug:
- logger.debug("GRBLHandler is not reset in debug mode")
- return
- if self.writer:
- self.writer.write(b"\x18") # Soft-reset GRBL
- await self.writer.drain()
- await asyncio.sleep(0.5) # Wait for GRBL to reset
- await self.close()
-
- await self.connect() # Reconnect after reset
- async def reset_usb(self):
- """
- Power cycle the USB port using uhubctl.
- Requires uhubctl to be installed and accessible.
- """
- if self.debug:
- logger.debug("USB reset skipped in debug mode")
- return
- cmd = ["uhubctl", "-l", "1-1", "-a", "cycle", "-d", "2"]
- logger.info(f"Resetting USB with: {' '.join(cmd)}")
- await asyncio.sleep(0.5) # Wait for GRBL to reset
- await self.connect() # Reconnect after reset
- try:
- result = subprocess.run(cmd, capture_output=True, text=True, check=True)
- logger.info(f"USB reset output: {result.stdout.strip()}")
- except subprocess.CalledProcessError as e:
- logger.error(f"Failed to reset USB: {e.stderr.strip()}")
- raise RuntimeError(f"USB reset failed: {e.stderr.strip()}")
- async def send_gcode(self, commands: List[str], timeout_s = 6):
- """Send GCODE commands"""
- if self.debug:
- logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
- return
- if not self.writer:
- logger.error("Writer is not initialized. Cannot send G-Code commands.")
- return
- self.controller_active.set()
- try:
- for cmd in commands:
- logger.debug(f"Sending G-Code command: {cmd}")
- self.writer.write(f"{cmd}\n".encode())
- await self.writer.drain()
- if not (await self._process_response(timeout_s)):
- raise RuntimeError("Did not receive response from GRBL")
- except RuntimeError as e:
- logger.error(f"Failed to send G-Code commands: {str(e)}")
- finally:
- self.controller_active.clear()
- async def get_status(self):
- """Get current GRBL status
-
- Returns:
- str: Status response from GRBL, or None if in debug mode
- """
- if self.debug:
- return None
-
- if not self.writer:
- return None
-
- response_lines = None
- try:
- self.writer.write(b"?") # '?' command returns status report
- await self.writer.drain()
- response_lines = str(await self._process_response())
- except Exception as e:
- logger.error(f"Failed to get status: {str(e)}")
- return None
-
- if not response_lines:
- return None
-
- for line in response_lines:
- if not 'ALARM' in line:
- continue
- if ":1" in line:
- raise RuntimeError("Hard Limit was triggered!")
- raise RuntimeError(f"Grbl threw ALARM: {line}")
-
- return response_lines
- async def wait_until_idle(self, timeout_s, position: Optional[tuple[float,float,float]] = None):
- """Wait until GRBL reports idle status
-
- Args:
- timeout_s: Timeout in seconds
- position: Optional list of 3 floats that will be updated with current position
- """
- if self.debug:
- await asyncio.sleep(1)
- return
-
- start = datetime.datetime.now()
- # response = await self.get_status() # First response can still be idle
- while True:
- response = await self.get_status()
- if response and 'WPos:' in response:
- # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
- pos_str = response.split('WPos:')[1].split('|')[0]
- if position is not None:
- x, y, z = map(float, pos_str.split(','))
- position = (x,y,z)
- if response and "Idle" in response:
- logger.debug("Movement complete.\nContinuing...")
- break
-
- now = datetime.datetime.now()
- if (now - start).total_seconds() > timeout_s:
- logger.error("Waiting on idle took too long!")
- raise TimeoutError("GRBL did not report idle status")
-
- await asyncio.sleep(0.5) # Async delay to prevent flooding
- async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: Optional[tuple[float,float,float]] = None):
- """Send GCODE commands and wait until machine is idle"""
- await self.send_gcode(commands)
- await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
- await self.wait_until_idle(timeout_s, position)
- def _check_pos_callbacks(self, line):
- # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
- if line.startswith('<') and 'WPos:' in line:
- pos_str = line.split('WPos:')[1].split('|')[0]
- x, y, z = map(float, pos_str.split(','))
- self.current_position = (x,y,z)
-
- # Notify all registered callbacks
- for callback in self.position_callbacks:
- callback(self.current_position)
- async def _process_response(self, timeout_s=6.0) -> list[str]:
- """Process GRBL responses"""
- response_lines : list[str] = []
- if not self.reader:
- return response_lines
-
- first = True
- while True:
- try:
- current_timeout = timeout_s if first else 0.1
- line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
- decoded = line.strip().decode("utf-8")
- self._check_pos_callbacks(decoded)
- logger.debug(f"G-Code response: {decoded}")
- response_lines.append(decoded)
- first = False
- except asyncio.TimeoutError:
- # No more data available right now
- break
- except Exception as e:
- raise RuntimeError(f"Failed to process response: {e}")
-
- if not response_lines:
- logger.warning(f"No GRBL response received! ({timeout_s}s)")
- return response_lines
- def register_position_callback(self, callback):
- """Register callback for position updates
-
- Args:
- callback: Function taking tuple with X,Y,Z coordinates
- """
- self.position_callbacks.append(callback)
- async def close(self):
- """Close connection"""
- if self.writer:
- self.writer.close()
- await self.writer.wait_closed()
- self.reader = None
- self.writer = None
|