grbl_handler.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import asyncio
  2. import serial_asyncio
  3. from typing import List, Optional
  4. import logging
  5. import datetime
  6. import collections
  7. import subprocess
  8. import os
  9. logger = logging.getLogger(__name__)
  10. HOMING_TIMEOUT_S = 80
  11. class GRBLHandler:
  12. def __init__(self, port: str, baudrate: int):
  13. self.port = port
  14. self.debug = port == "debug"
  15. self.baudrate = baudrate
  16. self.controller_active = asyncio.Event()
  17. self.reader : Optional[asyncio.StreamReader] = None
  18. self.writer : Optional[asyncio.StreamWriter] = None
  19. self.position_callbacks = []
  20. async def connect(self):
  21. """Connect to GRBL controller"""
  22. if self.debug:
  23. logger.debug("GRBLHandler is not connected in debug mode")
  24. return
  25. logger.info(f"Connecting to GRBL ({self.port}, {self.baudrate})...")
  26. try:
  27. self.reader, self.writer = await serial_asyncio.open_serial_connection(
  28. url = self.port,
  29. baudrate = self.baudrate
  30. )
  31. except serial_asyncio.serial.SerialException as e:
  32. raise RuntimeError(f"Failed to connect to robot: {str(e)}")
  33. init_response = await self._process_response()
  34. if not any(isinstance(response, str) and "Grbl" in response for response in init_response):
  35. raise RuntimeError("Failed to connect to GRBL")
  36. logger.info("GRBL connected.")
  37. ####################
  38. # Set GRBL settings:
  39. ####################
  40. config_path = "robot_control/config/grbl_config.txt"
  41. if os.path.exists(config_path):
  42. logger.info(f"Loading GRBL config from {config_path}")
  43. with open(config_path, "r") as f:
  44. config_lines = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
  45. if config_lines:
  46. await self.send_gcode(config_lines)
  47. else:
  48. logger.warning(f"GRBL config file not found: {config_path}")
  49. # Run homing cycle ($H)
  50. # Set absolute positioning mode (G90)
  51. # Set units to millimeters (G21)
  52. await self.send_gcode(['$H'], HOMING_TIMEOUT_S)
  53. await self.send_gcode(['G90', 'G21'])
  54. # if current pos not 0 set working pos to current pos
  55. # (should not be done every time to save eeprom writes)
  56. status_response = await self.get_status()
  57. current_pos = (0.0, 0.0, 0.0)
  58. if status_response and 'WPos:' in status_response:
  59. pos_str = status_response.split('WPos:')[1].split('|')[0]
  60. x, y, z = map(float, pos_str.split(','))
  61. current_pos = (x, y, z)
  62. # If not at (0,0,0), set working position to current position
  63. if any(abs(coord) > 1e-6 for coord in current_pos):
  64. await self.send_gcode([f'G10 L20 P1 X0 Y0 Z0'])
  65. logger.info(f"Set working position to current position: {current_pos}")
  66. async def reset(self):
  67. """Reset GRBL controller"""
  68. if self.debug:
  69. logger.debug("GRBLHandler is not reset in debug mode")
  70. return
  71. if self.writer:
  72. self.writer.write(b"\x18") # Soft-reset GRBL
  73. await self.writer.drain()
  74. await asyncio.sleep(0.5) # Wait for GRBL to reset
  75. await self.close()
  76. await self.connect() # Reconnect after reset
  77. async def reset_usb(self):
  78. """
  79. Power cycle the USB port using uhubctl.
  80. Requires uhubctl to be installed and accessible.
  81. """
  82. if self.debug:
  83. logger.debug("USB reset skipped in debug mode")
  84. return
  85. cmd = ["uhubctl", "-l", "1-1", "-a", "cycle", "-d", "2"]
  86. logger.info(f"Resetting USB with: {' '.join(cmd)}")
  87. await asyncio.sleep(0.5) # Wait for GRBL to reset
  88. await self.connect() # Reconnect after reset
  89. try:
  90. result = subprocess.run(cmd, capture_output=True, text=True, check=True)
  91. logger.info(f"USB reset output: {result.stdout.strip()}")
  92. except subprocess.CalledProcessError as e:
  93. logger.error(f"Failed to reset USB: {e.stderr.strip()}")
  94. raise RuntimeError(f"USB reset failed: {e.stderr.strip()}")
  95. async def send_gcode(self, commands: List[str], timeout_s = 6):
  96. """Send GCODE commands"""
  97. if self.debug:
  98. logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
  99. return
  100. if not self.writer:
  101. logger.error("Writer is not initialized. Cannot send G-Code commands.")
  102. return
  103. self.controller_active.set()
  104. try:
  105. for cmd in commands:
  106. logger.debug(f"Sending G-Code command: {cmd}")
  107. self.writer.write(f"{cmd}\n".encode())
  108. await self.writer.drain()
  109. if not (await self._process_response(timeout_s)):
  110. raise RuntimeError("Did not receive response from GRBL")
  111. except RuntimeError as e:
  112. logger.error(f"Failed to send G-Code commands: {str(e)}")
  113. finally:
  114. self.controller_active.clear()
  115. async def get_status(self):
  116. """Get current GRBL status
  117. Returns:
  118. str: Status response from GRBL, or None if in debug mode
  119. """
  120. if self.debug:
  121. return None
  122. if not self.writer:
  123. return None
  124. response_lines = None
  125. try:
  126. self.writer.write(b"?") # '?' command returns status report
  127. await self.writer.drain()
  128. response_lines = str(await self._process_response())
  129. except Exception as e:
  130. logger.error(f"Failed to get status: {str(e)}")
  131. return None
  132. if not response_lines:
  133. return None
  134. for line in response_lines:
  135. if not 'ALARM' in line:
  136. continue
  137. if ":1" in line:
  138. raise RuntimeError("Hard Limit was triggered!")
  139. raise RuntimeError(f"Grbl threw ALARM: {line}")
  140. return response_lines
  141. async def wait_until_idle(self, timeout_s, position: Optional[tuple[float,float,float]] = None):
  142. """Wait until GRBL reports idle status
  143. Args:
  144. timeout_s: Timeout in seconds
  145. position: Optional list of 3 floats that will be updated with current position
  146. """
  147. if self.debug:
  148. await asyncio.sleep(1)
  149. return
  150. start = datetime.datetime.now()
  151. # response = await self.get_status() # First response can still be idle
  152. while True:
  153. response = await self.get_status()
  154. if response and 'WPos:' in response:
  155. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  156. pos_str = response.split('WPos:')[1].split('|')[0]
  157. if position is not None:
  158. x, y, z = map(float, pos_str.split(','))
  159. position = (x,y,z)
  160. if response and "Idle" in response:
  161. logger.debug("Movement complete.\nContinuing...")
  162. break
  163. now = datetime.datetime.now()
  164. if (now - start).total_seconds() > timeout_s:
  165. logger.error("Waiting on idle took too long!")
  166. raise TimeoutError("GRBL did not report idle status")
  167. await asyncio.sleep(0.5) # Async delay to prevent flooding
  168. async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: Optional[tuple[float,float,float]] = None):
  169. """Send GCODE commands and wait until machine is idle"""
  170. await self.send_gcode(commands)
  171. await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
  172. await self.wait_until_idle(timeout_s, position)
  173. def _check_pos_callbacks(self, line):
  174. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  175. if line.startswith('<') and 'WPos:' in line:
  176. pos_str = line.split('WPos:')[1].split('|')[0]
  177. x, y, z = map(float, pos_str.split(','))
  178. self.current_position = (x,y,z)
  179. # Notify all registered callbacks
  180. for callback in self.position_callbacks:
  181. callback(self.current_position)
  182. async def _process_response(self, timeout_s=6.0) -> list[str]:
  183. """Process GRBL responses"""
  184. response_lines : list[str] = []
  185. if not self.reader:
  186. return response_lines
  187. first = True
  188. while True:
  189. try:
  190. current_timeout = timeout_s if first else 0.1
  191. line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
  192. decoded = line.strip().decode("utf-8")
  193. self._check_pos_callbacks(decoded)
  194. logger.debug(f"G-Code response: {decoded}")
  195. response_lines.append(decoded)
  196. first = False
  197. except asyncio.TimeoutError:
  198. # No more data available right now
  199. break
  200. except Exception as e:
  201. raise RuntimeError(f"Failed to process response: {e}")
  202. if not response_lines:
  203. logger.warning(f"No GRBL response received! ({timeout_s}s)")
  204. return response_lines
  205. def register_position_callback(self, callback):
  206. """Register callback for position updates
  207. Args:
  208. callback: Function taking tuple with X,Y,Z coordinates
  209. """
  210. self.position_callbacks.append(callback)
  211. async def close(self):
  212. """Close connection"""
  213. if self.writer:
  214. self.writer.close()
  215. await self.writer.wait_closed()
  216. self.reader = None
  217. self.writer = None