grbl_handler.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import asyncio
  2. import serial_asyncio
  3. from typing import List
  4. import logging
  5. import datetime
  6. import collections
  7. MOVE_RATE = 50
  8. APPROACHE_RATE = 10
  9. CELL_CIRCLE_RADIUS = 0.35
  10. HOMING_TIMEOUT_S = 80
  11. logger = logging.getLogger(__name__)
  12. class GRBLHandler:
  13. def __init__(self, port: str, baudrate: int):
  14. self.port = port
  15. self.debug = port == "debug"
  16. self.baudrate = baudrate
  17. self.controller_active = asyncio.Event()
  18. self.reader = None
  19. self.writer = None
  20. self.position_callbacks = []
  21. async def connect(self):
  22. """Connect to GRBL controller"""
  23. if self.debug:
  24. logger.debug("GRBLHandler is not connected in debug mode")
  25. return
  26. logger.info("Connecting to GRBL...")
  27. try:
  28. self.reader, self.writer = await serial_asyncio.open_serial_connection(
  29. url = self.port,
  30. baudrate = self.baudrate
  31. )
  32. except serial_asyncio.serial.SerialException as e:
  33. raise serial_asyncio.serial.SerialException(f"Failed to connect to robot: {str(e)}")
  34. init_response = await self._process_response()
  35. if not any(isinstance(response, collections.abc.Iterable) and "Grbl" in response for response in init_response):
  36. raise RuntimeError("Failed to connect to GRBL")
  37. logger.info("GRBL connected.")
  38. # Set GRBL settings:
  39. # Enable homing ($22=1)
  40. # Run homing cycle ($H)
  41. # Set absolute positioning mode (G90)
  42. # Set units to millimeters (G21)
  43. await self.send_gcode(['$22=1'])
  44. await self.send_gcode(['$H'], HOMING_TIMEOUT_S)
  45. await self.send_gcode(['G90', 'G21'])
  46. # if current pos not 0 set working pos to current pos
  47. # (should not be done every time to save eeprom writes)
  48. status_response = await self.get_status()
  49. current_pos = (0.0, 0.0, 0.0)
  50. if status_response and 'WPos:' in status_response:
  51. pos_str = status_response.split('WPos:')[1].split('|')[0]
  52. x, y, z = map(float, pos_str.split(','))
  53. current_pos = (x, y, z)
  54. # If not at (0,0,0), set working position to current position
  55. if any(abs(coord) > 1e-6 for coord in current_pos):
  56. await self.send_gcode([f'G10 L20 P1 X0 Y0 Z0'])
  57. logger.info(f"Set working position to current position: {current_pos}")
  58. async def reset(self):
  59. """Reset GRBL controller"""
  60. if self.debug:
  61. logger.debug("GRBLHandler is not reset in debug mode")
  62. return
  63. if self.writer:
  64. self.writer.write(b"\x18") # Soft-reset GRBL
  65. await self.writer.drain()
  66. await asyncio.sleep(0.5) # Wait for GRBL to reset
  67. await self.close()
  68. await self.connect() # Reconnect after reset
  69. async def send_gcode(self, commands: List[str], timeout_s = 6):
  70. """Send GCODE commands"""
  71. if self.debug:
  72. logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
  73. return
  74. self.controller_active.set()
  75. try:
  76. for cmd in commands:
  77. logger.debug(f"Sending G-Code command: {cmd}")
  78. self.writer.write(f"{cmd}\n".encode())
  79. await self.writer.drain()
  80. if not (await self._process_response(timeout_s)):
  81. raise RuntimeError("Did not receive response from GRBL")
  82. except RuntimeError as e:
  83. logger.error(f"Failed to send G-Code commands: {str(e)}")
  84. finally:
  85. self.controller_active.clear()
  86. async def get_status(self):
  87. """Get current GRBL status
  88. Returns:
  89. str: Status response from GRBL, or None if in debug mode
  90. """
  91. if self.debug:
  92. return None
  93. if not self.writer:
  94. return None
  95. response_lines = None
  96. try:
  97. self.writer.write(b"?") # '?' command returns status report
  98. await self.writer.drain()
  99. response_lines = str(await self._process_response())
  100. except Exception as e:
  101. logger.error(f"Failed to get status: {str(e)}")
  102. return None
  103. if not response_lines:
  104. return None
  105. for line in response_lines:
  106. if not 'ALARM' in line:
  107. continue
  108. if ":1" in line:
  109. raise RuntimeError("Hard Limit was triggered!")
  110. raise RuntimeError(f"Grbl threw ALARM: {line}")
  111. return response_lines
  112. async def wait_until_idle(self, timeout_s, position: list[float] = None):
  113. """Wait until GRBL reports idle status
  114. Args:
  115. timeout_s: Timeout in seconds
  116. position: Optional list of 3 floats that will be updated with current position
  117. """
  118. if self.debug:
  119. await asyncio.sleep(1)
  120. return
  121. start = datetime.datetime.now()
  122. # response = await self.get_status() # First response can still be idle
  123. while True:
  124. response = await self.get_status()
  125. if response and 'WPos:' in response:
  126. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  127. pos_str = response.split('WPos:')[1].split('|')[0]
  128. if position is not None:
  129. x, y, z = map(float, pos_str.split(','))
  130. position = (x,y,z)
  131. if response and "Idle" in response:
  132. logger.debug("Movement complete.\nContinuing...")
  133. break
  134. now = datetime.datetime.now()
  135. if (now - start).total_seconds() > timeout_s:
  136. logger.error("Waiting on idle took too long!")
  137. raise TimeoutError("GRBL did not report idle status")
  138. await asyncio.sleep(0.5) # Async delay to prevent flooding
  139. async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: list[float] = None):
  140. """Send GCODE commands and wait until machine is idle"""
  141. await self.send_gcode(commands)
  142. await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
  143. await self.wait_until_idle(timeout_s, position)
  144. def _check_pos_callbacks(self, line):
  145. # Parse position from status reports (<Idle|WPos:0.000,0.000,0.000|...>)
  146. if line.startswith('<') and 'WPos:' in line:
  147. pos_str = line.split('WPos:')[1].split('|')[0]
  148. x, y, z = map(float, pos_str.split(','))
  149. self.current_position = (x,y,z)
  150. # Notify all registered callbacks
  151. for callback in self.position_callbacks:
  152. callback(self.current_position)
  153. async def _process_response(self, timeout_s=6.0):
  154. """Process GRBL responses"""
  155. if self.reader:
  156. response_lines = []
  157. first = True
  158. while True:
  159. try:
  160. current_timeout = timeout_s if first else 0.1
  161. line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
  162. decoded = line.strip().decode("utf-8")
  163. self._check_pos_callbacks(decoded)
  164. logger.debug(f"G-Code response: {decoded}")
  165. response_lines.append(decoded)
  166. first = False
  167. except asyncio.TimeoutError:
  168. # No more data available right now
  169. break
  170. except Exception as e:
  171. raise RuntimeError(f"Failed to process response: {e}")
  172. if not response_lines:
  173. logger.warning(f"No GRBL response received! ({timeout_s}s)")
  174. return response_lines
  175. def register_position_callback(self, callback):
  176. """Register callback for position updates
  177. Args:
  178. callback: Function taking tuple with X,Y,Z coordinates
  179. """
  180. self.position_callbacks.append(callback)
  181. async def close(self):
  182. """Close connection"""
  183. if self.writer:
  184. self.writer.close()
  185. await self.writer.wait_closed()
  186. self.reader = None
  187. self.writer = None