from abc import ABC, abstractmethod import pigpio import time class GPIOInterface(ABC): """ Interface for handling communication to the GPIO pins. out_pins are set low when program is aborted. """ def __init__(self, in_pins: list=[], out_pins: list=[]): self.out_pins = out_pins self.in_pins = in_pins @abstractmethod def set_pin(self, pin: int, value: int) -> None: pass @abstractmethod def get_pin(self, pin: int) -> int: pass @abstractmethod def set_servo_angle(self, pin: int, angle: float) -> None: """ Set the servo connected to 'pin' to the specified angle. """ pass @abstractmethod def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None: """ Move the servo to 'target_angle' at the specified speed (duration in ms). """ pass def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True): """ Perform a step operation on a stepper motor. """ pass def cleanup(self): pass class PiGPIO(GPIOInterface): def __init__(self, in_pins: list=[], out_pins: list=[]): super().__init__(in_pins, out_pins) self.pi = pigpio.pi() if not self.pi.connected: raise RuntimeError("Could not connect to pigpiod. Is it running? Try: 'sudo systemctl start pigpiod'") for pin in out_pins: self.pi.set_mode(pin, pigpio.OUTPUT) self.pi.write(pin, 0) # Ensure output pins are off initially for pin in in_pins: self.pi.set_mode(pin, pigpio.INPUT) def set_pin(self, pin: int, value: int) -> None: if pin < 0 or pin > 27: return if pin not in self.out_pins: self.pi.set_mode(pin, pigpio.OUTPUT) self.out_pins.append(pin) self.pi.write(pin, value) def get_pin(self, pin: int) -> int: if pin < 0 or pin > 27: return -1 if pin not in self.in_pins: self.pi.set_mode(pin, pigpio.INPUT) self.in_pins.append(pin) return self.pi.read(pin) def set_servo_angle(self, pin: int, angle_deg: float) -> None: """ Set servo to a specific angle (0-180 degrees). MG90 typical pulse width: 0 deg = 500us, 180 deg = 2500us. """ if pin not in self.out_pins: self.pi.set_mode(pin, pigpio.OUTPUT) self.out_pins.append(pin) # Clamp angle angle_deg = max(0, min(180, angle_deg)) # Map angle to pulse width pulsewidth = int(500 + (angle_deg / 180.0) * 2000) self.pi.set_servo_pulsewidth(pin, pulsewidth) def set_servo_angle_smooth(self, pin: int, target_angle_deg: float, duration_ms: float) -> None: """ Move servo to target_angle at given speed (duration in ms). """ if pin < 0 or pin > 27 or duration_ms <= 0: return # Read current angle by assuming last set value (no feedback) # For simplicity, we store last angle in an instance dict if not hasattr(self, "_servo_angles"): self._servo_angles:dict[int, float] = {} if pin not in self._servo_angles: self.set_servo_angle(pin, target_angle_deg) return current_angle = self._servo_angles[pin] target_angle_deg = max(0, min(180, target_angle_deg)) step = 1 if target_angle_deg > current_angle else -1 delay = duration_ms / int(abs(target_angle_deg-current_angle)) / 1000 for angle in range(int(current_angle), int(target_angle_deg), step): self.set_servo_angle(pin, angle) time.sleep(delay) self.set_servo_angle(pin, target_angle_deg) self._servo_angles[pin] = target_angle_deg def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True): """ Perform a step operation on a stepper motor. dir_pin: Direction pin step_pin: Step pin to trigger steps steps: Number of steps to perform step_delay_us: Delay between steps in microseconds direction: True for forward, False for reverse """ # Set direction self.set_pin(dir_pin, 1 if direction else 0) # Create pulse waveform pulses = [] for _ in range(steps): pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us)) # STEP high pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us)) # STEP low self.pi.wave_clear() self.pi.wave_add_generic(pulses) wave_id = self.pi.wave_create() if wave_id >= 0: self.pi.wave_send_once(wave_id) while self.pi.wave_tx_busy(): time.sleep(0.1) self.pi.wave_delete(wave_id) self.set_pin(dir_pin, 0) def cleanup(self): if self.pi.connected: for pin in self.out_pins: self.set_pin(pin, 0) self.pi.stop() class MockGPIO(GPIOInterface): def set_pin(self, pin: int, value: int) -> None: pass def get_pin(self, pin: int) -> int: return 0 def set_servo_angle(self, pin: int, angle: float) -> None: pass def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None: pass