grbl_handler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import asyncio
  2. import serial_asyncio
  3. from typing import List, Optional
  4. import logging
  5. import datetime
  6. import subprocess
  7. import os
  8. logger = logging.getLogger(__name__)
  9. HOMING_TIMEOUT_S = 80
  10. class GRBLHandler:
  11. def __init__(self, port: str, baudrate: int):
  12. self.port = port
  13. self.debug = port == "debug"
  14. self.baudrate = baudrate
  15. self.controller_active = asyncio.Event()
  16. self.reader : Optional[asyncio.StreamReader] = None
  17. self.writer : Optional[asyncio.StreamWriter] = None
  18. self.position_callbacks = []
  19. async def connect(self):
  20. """Connect to GRBL controller"""
  21. if self.debug:
  22. logger.debug("GRBLHandler is not connected in debug mode")
  23. return
  24. logger.info(f"Connecting to GRBL ({self.port}, {self.baudrate})...")
  25. try:
  26. self.reader, self.writer = await serial_asyncio.open_serial_connection(
  27. url = self.port,
  28. baudrate = self.baudrate
  29. )
  30. except serial_asyncio.serial.SerialException as e:
  31. raise RuntimeError(f"Failed to connect to robot: {str(e)}")
  32. init_response = await self._process_response()
  33. if not any(isinstance(response, str) and "Grbl" in response for response in init_response):
  34. raise RuntimeError("Failed to connect to GRBL")
  35. logger.info("GRBL connected.")
  36. ####################
  37. # Set GRBL settings:
  38. ####################
  39. config_path = "robot_control/config/grbl_config.txt"
  40. if os.path.exists(config_path):
  41. logger.info(f"Loading GRBL config from {config_path}")
  42. with open(config_path, "r") as f:
  43. config_lines = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
  44. if config_lines:
  45. await self.send_gcode(config_lines)
  46. else:
  47. logger.warning(f"GRBL config file not found: {config_path}")
  48. # Run homing cycle ($H)
  49. # Set absolute positioning mode (G90)
  50. # Set units to millimeters (G21)
  51. await self.send_gcode(['$H'], HOMING_TIMEOUT_S)
  52. await self.send_gcode(['G90', 'G21'])
  53. # if current pos not 0 set working pos to current pos
  54. # (should not be done every time to save eeprom writes)
  55. status_response = await self.get_status()
  56. current_pos = (0.0, 0.0, 0.0)
  57. if status_response and 'WPos:' in status_response:
  58. pos_str = status_response.split('WPos:')[1].split('|')[0]
  59. x, y, z = map(float, pos_str.split(','))
  60. current_pos = (x, y, z)
  61. # If not at (0,0,0), set working position to current position
  62. if any(abs(coord) > 1e-6 for coord in current_pos):
  63. await self.send_gcode([f'G10 L20 P1 X0 Y0 Z0'])
  64. logger.info(f"Set working position to current position: {current_pos}")
  65. async def reset(self):
  66. """Reset GRBL controller"""
  67. if self.debug:
  68. logger.debug("GRBLHandler is not reset in debug mode")
  69. return
  70. if self.writer:
  71. self.writer.write(b"\x18") # Soft-reset GRBL
  72. await self.writer.drain()
  73. await asyncio.sleep(0.5) # Wait for GRBL to reset
  74. await self.close()
  75. await self.connect() # Reconnect after reset
  76. async def reset_usb(self):
  77. """
  78. Power cycle the USB port using uhubctl.
  79. Requires uhubctl to be installed and accessible.
  80. """
  81. if self.debug:
  82. logger.debug("USB reset skipped in debug mode")
  83. return
  84. cmd = ["uhubctl", "-l", "1-1", "-a", "cycle", "-d", "2"]
  85. logger.info(f"Resetting USB with: {' '.join(cmd)}")
  86. await asyncio.sleep(0.5) # Wait for GRBL to reset
  87. await self.connect() # Reconnect after reset
  88. try:
  89. result = subprocess.run(cmd, capture_output=True, text=True, check=True)
  90. logger.info(f"USB reset output: {result.stdout.strip()}")
  91. except subprocess.CalledProcessError as e:
  92. logger.error(f"Failed to reset USB: {e.stderr.strip()}")
  93. raise RuntimeError(f"USB reset failed: {e.stderr.strip()}")
  94. async def send_gcode(self, commands: List[str], timeout_s = 6):
  95. """Send GCODE commands"""
  96. if self.debug:
  97. logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
  98. return
  99. if not self.writer:
  100. logger.error("Writer is not initialized. Cannot send G-Code commands.")
  101. return
  102. self.controller_active.set()
  103. try:
  104. for cmd in commands:
  105. logger.debug(f"Sending G-Code command: {cmd}")
  106. self.writer.write(f"{cmd}\n".encode())
  107. await self.writer.drain()
  108. if not (await self._process_response(timeout_s)):
  109. raise RuntimeError("Did not receive response from GRBL")
  110. except RuntimeError as e:
  111. if "ALARM" in str(e):
  112. raise
  113. logger.error(f"Failed to send G-Code commands: {str(e)}")
  114. finally:
  115. self.controller_active.clear()
  116. async def get_status(self):
  117. """Get current GRBL status
  118. Returns:
  119. str: Status response from GRBL, or None if in debug mode
  120. """
  121. if self.debug:
  122. return None
  123. if not self.writer:
  124. return None
  125. response_lines = None
  126. try:
  127. self.writer.write(b"?") # '?' command returns status report
  128. await self.writer.drain()
  129. response_lines = str(await self._process_response())
  130. except Exception as e:
  131. logger.error(f"Failed to get status: {str(e)}")
  132. return None
  133. if not response_lines:
  134. return None
  135. return response_lines
  136. async def wait_until_idle(self, timeout_s, position: Optional[tuple[float,float,float]] = None):
  137. """Wait until GRBL reports idle status
  138. Args:
  139. timeout_s: Timeout in seconds
  140. position: Optional list of 3 floats that will be updated with current position
  141. """
  142. if self.debug:
  143. await asyncio.sleep(1)
  144. return
  145. start = datetime.datetime.now()
  146. # response = await self.get_status() # First response can still be idle
  147. while True:
  148. response = await self.get_status()
  149. if response and 'WPos:' in response:
  150. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  151. pos_str = response.split('WPos:')[1].split('|')[0]
  152. if position is not None:
  153. x, y, z = map(float, pos_str.split(','))
  154. position = (x,y,z)
  155. if response and "Idle" in response:
  156. logger.debug("Movement complete.\nContinuing...")
  157. break
  158. now = datetime.datetime.now()
  159. if (now - start).total_seconds() > timeout_s:
  160. logger.error("Waiting on idle took too long!")
  161. raise TimeoutError("GRBL did not report idle status")
  162. await asyncio.sleep(0.5) # Async delay to prevent flooding
  163. async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: Optional[tuple[float,float,float]] = None):
  164. """Send GCODE commands and wait until machine is idle"""
  165. await self.send_gcode(commands)
  166. await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
  167. await self.wait_until_idle(timeout_s, position)
  168. def _check_pos_callbacks(self, line):
  169. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  170. if line.startswith('<') and 'WPos:' in line:
  171. pos_str = line.split('WPos:')[1].split('|')[0]
  172. x, y, z = map(float, pos_str.split(','))
  173. self.current_position = (x,y,z)
  174. # Notify all registered callbacks
  175. for callback in self.position_callbacks:
  176. callback(self.current_position)
  177. async def _process_response(self, timeout_s=6.0) -> list[str]:
  178. """Process GRBL responses"""
  179. response_lines : list[str] = []
  180. if not self.reader:
  181. return response_lines
  182. first = True
  183. while True:
  184. try:
  185. current_timeout = timeout_s if first else 0.1
  186. line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
  187. decoded = line.strip().decode("utf-8")
  188. self._check_pos_callbacks(decoded)
  189. logger.debug(f"G-Code response: {decoded}")
  190. response_lines.append(decoded)
  191. first = False
  192. except asyncio.TimeoutError:
  193. # No more data available right now
  194. break
  195. except Exception as e:
  196. raise RuntimeError(f"Failed to process response: {e}")
  197. if not response_lines:
  198. logger.warning(f"No GRBL response received! ({timeout_s}s)")
  199. alarm_messages = {
  200. "1": "Hard limit has been triggered. Machine position is likely lost due to sudden halt. Re-homing is highly recommended.",
  201. "2": "Soft limit: G-code motion target exceeds machine travel. Machine position retained. Alarm may be safely unlocked.",
  202. "3": "Abort during cycle: Machine position is likely lost due to sudden halt. Re-homing is highly recommended. May be due to issuing g-code commands that exceed the limit of the machine.",
  203. "4": "Probe fail: Probe is not in the expected initial state before starting probe cycle. Move the bit away from the touch plate.",
  204. "5": "Probe fail: Probe did not contact the workpiece within the programmed travel. Move the bit closer to the touch plate.",
  205. "6": "Homing fail: The active homing cycle was reset.",
  206. "7": "Homing fail: Safety door was opened during homing cycle.",
  207. "8": "Homing fail: Pull off travel failed to clear limit switch. Try increasing pull-off setting or check wiring.",
  208. "9": "Homing fail: Could not find limit switch within search distances. Check max travel, pull-off distance, or wiring.",
  209. "10": "EStop asserted: Emergency stop pressed. Unclick E-stop, then clear this alarm to continue.",
  210. "14": "Spindle at speed timeout: Spindle hasn’t gotten up to speed or is wired incorrectly. Check setup and try again.",
  211. "15": "Homing fail: Could not find second limit switch for auto squared axis. Check max travel, pull-off distance, or wiring.",
  212. "17": "Motor fault: Issue encountered with closed loop motor tracking. Position likely lost.",
  213. }
  214. for line in response_lines:
  215. if 'ALARM' not in line:
  216. continue
  217. for code, message in alarm_messages.items():
  218. if f":{code}" in line:
  219. raise RuntimeError(f"Grbl ALARM {code}: {message}")
  220. raise RuntimeError(f"Grbl threw ALARM: {line}")
  221. return response_lines
  222. def register_position_callback(self, callback):
  223. """Register callback for position updates
  224. Args:
  225. callback: Function taking tuple with X,Y,Z coordinates
  226. """
  227. self.position_callbacks.append(callback)
  228. async def close(self):
  229. """Close connection"""
  230. if self.writer:
  231. self.writer.close()
  232. await self.writer.wait_closed()
  233. self.reader = None
  234. self.writer = None