| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import asyncio
- import serial_asyncio
- from typing import List
- from robot_control.src.utils.logging import LoggerSingleton
- import datetime
- import collections
- MOVE_RATE = 50
- APPROACHE_RATE = 10
- CELL_CIRCLE_RADIUS = 0.35
- logger = LoggerSingleton.get_logger()
- class GRBLHandler:
- def __init__(self, port: str, baudrate: int):
- self.port = port
- self.debug = port == "debug"
- self.baudrate = baudrate
-
- self.controller_active = asyncio.Event()
-
- self.reader = None
- self.writer = None
- self.position_callbacks = []
- async def connect(self):
- """Connect to GRBL controller"""
- if self.debug:
- logger.debug("GRBLHandler is not connected in debug mode")
- return
- logger.info("Connecting to GRBL...")
- try:
- self.reader, self.writer = await serial_asyncio.open_serial_connection(
- url = self.port,
- baudrate = self.baudrate
- )
- except serial_asyncio.serial.SerialException as e:
- raise serial_asyncio.serial.SerialException(f"Failed to connect to robot: {str(e)}")
- init_response = []
- for _ in range(3): # Flush initial responses
- init_response.append(await self._process_response())
- if not any(isinstance(response, collections.Iterable) and "Grbl" in response for response in init_response):
- raise RuntimeError("Failed to connect to GRBL")
-
- logger.info("GRBL connected.")
- # Enable homing ($22=1)
- # Run homing cycle ($H)
- # Set absolute positioning mode (G90)
- # Set units to millimeters (G21)
- # await self.send_gcode(['$22=1', '$H', 'G90', 'G21'])
- await self.send_gcode(['$22=1', '$X', 'G90', 'G21']) # This skips homing
- async def send_gcode(self, commands: List[str]):
- """Send GCODE commands"""
- if self.debug:
- logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
- return
- self.controller_active.set()
- try:
- for cmd in commands:
- logger.debug(f"Sending G-Code command: {cmd}")
- self.writer.write(f"{cmd}\n".encode())
- await self.writer.drain()
- if not (await self._process_response()):
- raise RuntimeError("Did not receive response from GRBL")
- except RuntimeError as e:
- logger.error(f"Failed to send G-Code commands: {str(e)}")
- finally:
- self.controller_active.clear()
- async def get_status(self):
- """Get current GRBL status
-
- Returns:
- str: Status response from GRBL, or None if in debug mode
- """
- if self.debug:
- return None
-
- if self.writer:
- self.writer.write(b"?\n")
- await self.writer.drain()
- # '?' command returns status report and 'ok'
- response = await self._process_response()
- response = response + await self._process_response()
- return response
- return None
- async def wait_until_idle(self, timeout_s, position: list[float] = None):
- """Wait until GRBL reports idle status
-
- Args:
- timeout_s: Timeout in seconds
- position: Optional list of 3 floats that will be updated with current position
- """
- if self.debug:
- await asyncio.sleep(1)
- return
-
- start = datetime.datetime.now()
- while True:
- response = await self.get_status()
- if response and 'MPos:' in response:
- # Parse position from status reports (<Idle|MPos:0.000,0.000,0.000|...>)
- pos_str = response.split('MPos:')[1].split('|')[0]
- if position is not None:
- x, y, z = map(float, pos_str.split(','))
- position = (x,y,z)
- if response and "Idle" in response:
- logger.debug("Movement complete.\nContinuing...")
- break
-
- now = datetime.datetime.now()
- if (now - start).total_seconds() > timeout_s:
- logger.error("Waiting on idle took too long!")
- raise TimeoutError("GRBL did not report idle status")
-
- await asyncio.sleep(0.2) # Async delay to prevent flooding
- async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: list[float] = None):
- """Send GCODE commands and wait until machine is idle"""
- await self.send_gcode(commands)
- await asyncio.sleep(0.2) # Delay to allow GRBL to process commands
- await self.wait_until_idle(timeout_s, position)
- async def _process_response(self):
- """Process GRBL responses"""
- if self.reader:
- try:
- response = await asyncio.wait_for(self.reader.readuntil(), timeout=4.0) # 2 second timeout
- decoded = response.strip().decode("utf-8")
- logger.debug(f"G-Code response: {decoded}")
-
- # Parse position from status reports (<Idle|MPos:0.000,0.000,0.000|...>)
- if decoded.startswith('<') and 'MPos:' in decoded:
- pos_str = decoded.split('MPos:')[1].split('|')[0]
- x, y, z = map(float, pos_str.split(','))
- self.current_position = (x,y,z)
-
- # Notify all registered callbacks
- for callback in self.position_callbacks:
- callback(self.current_position)
-
- return decoded
- except asyncio.TimeoutError as e:
- logger.warning("Timeout waiting for GRBL response")
- except Exception as e:
- raise RuntimeError(f"Failed to process response: {e}")
- def register_position_callback(self, callback):
- """Register callback for position updates
-
- Args:
- callback: Function taking tuple with X,Y,Z coordinates
- """
- self.position_callbacks.append(callback)
- async def close(self):
- """Close connection"""
- if self.writer:
- self.writer.close()
- await self.writer.wait_closed()
|