|
|
@@ -1,23 +1,33 @@
|
|
|
from typing import Optional
|
|
|
import time
|
|
|
import asyncio
|
|
|
+import pigpio
|
|
|
from robot_control.src.api.grbl_handler import GRBLHandler
|
|
|
from robot_control.src.utils.logging import LoggerSingleton
|
|
|
+from robot_control.src.utils.config import ConfigParser
|
|
|
|
|
|
|
|
|
logger = LoggerSingleton.get_logger()
|
|
|
|
|
|
class RobotMovement:
|
|
|
- def __init__(self, grbl: GRBLHandler):
|
|
|
- self.grbl = grbl
|
|
|
+ def __init__(self, grbl: GRBLHandler, config: ConfigParser):
|
|
|
+ self.pi = pigpio.pi()
|
|
|
+ self.config = config
|
|
|
+ vacuum_config = config.get_vacuum_config()
|
|
|
+ self.valve_gpio_pin = vacuum_config.get("valve_gpio_pin")
|
|
|
+ self.pi.set_mode(self.valve_gpio_pin, pigpio.OUTPUT)
|
|
|
+ self.pi.write(self.valve_gpio_pin, 0) # Ensure pump is off initially
|
|
|
+
|
|
|
+ sys_config = config.get_system_settings()
|
|
|
+ self.speed: float = sys_config.speed
|
|
|
+
|
|
|
+ self.home_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
self.current_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
- self.speed: float = 100.0 # TODO [SG]: Set at beginning?
|
|
|
self.is_moving: bool = False
|
|
|
- self.home_position: tuple[float, float, float] = (0.0, 0.0, 0.0)
|
|
|
self.endeffector_active = False
|
|
|
- self.pump_speed = 1000 # Default pump speed/power
|
|
|
|
|
|
# Register for position updates
|
|
|
+ self.grbl = grbl
|
|
|
self.grbl.register_position_callback(self._position_callback)
|
|
|
|
|
|
def _position_callback(self, pos: dict):
|
|
|
@@ -70,16 +80,10 @@ class RobotMovement:
|
|
|
def get_position(self) -> tuple[float, float, float]:
|
|
|
return self.current_position
|
|
|
|
|
|
- async def activate_endeffector(self, speed: Optional[int] = None) -> bool:
|
|
|
- """Activate the pump end effector.
|
|
|
-
|
|
|
- Args:
|
|
|
- speed: Optional pump speed (0-1000)
|
|
|
- """
|
|
|
+ async def activate_endeffector(self) -> bool:
|
|
|
+ """Activate the pump end effector."""
|
|
|
try:
|
|
|
- if speed is not None:
|
|
|
- self.pump_speed = max(0, min(1000, speed))
|
|
|
- await self.grbl.send_gcode([f'M3 S{self.pump_speed}'])
|
|
|
+ self.pi.write(self.valve_gpio_pin, 1)
|
|
|
self.endeffector_active = True
|
|
|
logger.debug("End effector activated")
|
|
|
return True
|
|
|
@@ -90,7 +94,7 @@ class RobotMovement:
|
|
|
async def deactivate_endeffector(self) -> bool:
|
|
|
"""Deactivate the pump end effector."""
|
|
|
try:
|
|
|
- await self.grbl.send_gcode(['M5'])
|
|
|
+ self.pi.write(self.valve_gpio_pin, 0)
|
|
|
self.endeffector_active = False
|
|
|
logger.debug("End effector deactivated")
|
|
|
return True
|