grbl_handler.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import asyncio
  2. import serial_asyncio
  3. from typing import List
  4. from robot_control.src.utils.logging import LoggerSingleton
  5. import datetime
  6. import collections
  7. MOVE_RATE = 50
  8. APPROACHE_RATE = 10
  9. CELL_CIRCLE_RADIUS = 0.35
  10. logger = LoggerSingleton.get_logger()
  11. class GRBLHandler:
  12. def __init__(self, port: str, baudrate: int):
  13. self.port = port
  14. self.debug = port == "debug"
  15. self.baudrate = baudrate
  16. self.controller_active = asyncio.Event()
  17. self.reader = None
  18. self.writer = None
  19. self.position_callbacks = []
  20. async def connect(self):
  21. """Connect to GRBL controller"""
  22. if self.debug:
  23. logger.debug("GRBLHandler is not connected in debug mode")
  24. return
  25. logger.info("Connecting to GRBL...")
  26. try:
  27. self.reader, self.writer = await serial_asyncio.open_serial_connection(
  28. url = self.port,
  29. baudrate = self.baudrate
  30. )
  31. except serial_asyncio.serial.SerialException as e:
  32. raise serial_asyncio.serial.SerialException(f"Failed to connect to robot: {str(e)}")
  33. init_response = []
  34. for _ in range(3): # Flush initial responses
  35. init_response.append(await self._process_response())
  36. if not any(isinstance(response, collections.Iterable) and "Grbl" in response for response in init_response):
  37. raise RuntimeError("Failed to connect to GRBL")
  38. logger.info("GRBL connected.")
  39. # Enable homing ($22=1)
  40. # Run homing cycle ($H)
  41. # Set absolute positioning mode (G90)
  42. # Set units to millimeters (G21)
  43. # await self.send_gcode(['$22=1', '$H', 'G90', 'G21'])
  44. await self.send_gcode(['$22=1', '$X', 'G90', 'G21']) # This skips homing
  45. async def send_gcode(self, commands: List[str]):
  46. """Send GCODE commands"""
  47. if self.debug:
  48. logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
  49. return
  50. self.controller_active.set()
  51. try:
  52. for cmd in commands:
  53. logger.debug(f"Sending G-Code command: {cmd}")
  54. self.writer.write(f"{cmd}\n".encode())
  55. await self.writer.drain()
  56. if not (await self._process_response()):
  57. raise RuntimeError("Did not receive response from GRBL")
  58. except RuntimeError as e:
  59. logger.error(f"Failed to send G-Code commands: {str(e)}")
  60. finally:
  61. self.controller_active.clear()
  62. async def get_status(self):
  63. """Get current GRBL status
  64. Returns:
  65. str: Status response from GRBL, or None if in debug mode
  66. """
  67. if self.debug:
  68. return None
  69. if self.writer:
  70. self.writer.write(b"?\n")
  71. await self.writer.drain()
  72. # '?' command returns status report and 'ok'
  73. response = await self._process_response()
  74. response = response + await self._process_response()
  75. return response
  76. return None
  77. async def wait_until_idle(self, timeout_s, position: list[float] = None):
  78. """Wait until GRBL reports idle status
  79. Args:
  80. timeout_s: Timeout in seconds
  81. position: Optional list of 3 floats that will be updated with current position
  82. """
  83. if self.debug:
  84. await asyncio.sleep(1)
  85. return
  86. start = datetime.datetime.now()
  87. while True:
  88. response = await self.get_status()
  89. if response and 'MPos:' in response:
  90. # Parse position from status reports (<Idle|MPos:0.000,0.000,0.000|...>)
  91. pos_str = response.split('MPos:')[1].split('|')[0]
  92. if position is not None:
  93. x, y, z = map(float, pos_str.split(','))
  94. position = (x,y,z)
  95. if response and "Idle" in response:
  96. logger.debug("Movement complete.\nContinuing...")
  97. break
  98. now = datetime.datetime.now()
  99. if (now - start).total_seconds() > timeout_s:
  100. logger.error("Waiting on idle took too long!")
  101. raise TimeoutError("GRBL did not report idle status")
  102. await asyncio.sleep(0.2) # Async delay to prevent flooding
  103. async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: list[float] = None):
  104. """Send GCODE commands and wait until machine is idle"""
  105. await self.send_gcode(commands)
  106. await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
  107. await self.wait_until_idle(timeout_s, position)
  108. async def _process_response(self):
  109. """Process GRBL responses"""
  110. if self.reader:
  111. try:
  112. response = await asyncio.wait_for(self.reader.readuntil(), timeout=4.0) # 2 second timeout
  113. decoded = response.strip().decode("utf-8")
  114. logger.debug(f"G-Code response: {decoded}")
  115. # Parse position from status reports (<Idle|MPos:0.000,0.000,0.000|...>)
  116. if decoded.startswith('<') and 'MPos:' in decoded:
  117. pos_str = decoded.split('MPos:')[1].split('|')[0]
  118. x, y, z = map(float, pos_str.split(','))
  119. self.current_position = (x,y,z)
  120. # Notify all registered callbacks
  121. for callback in self.position_callbacks:
  122. callback(self.current_position)
  123. return decoded
  124. except asyncio.TimeoutError as e:
  125. logger.warning("Timeout waiting for GRBL response")
  126. except Exception as e:
  127. raise RuntimeError(f"Failed to process response: {e}")
  128. def register_position_callback(self, callback):
  129. """Register callback for position updates
  130. Args:
  131. callback: Function taking tuple with X,Y,Z coordinates
  132. """
  133. self.position_callbacks.append(callback)
  134. async def close(self):
  135. """Close connection"""
  136. if self.writer:
  137. self.writer.close()
  138. await self.writer.wait_closed()