| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from abc import ABC, abstractmethod
- import pigpio
- import time
- class GPIOInterface(ABC):
- """
- Interface for handling communication to the GPIO pins.
- out_pins are set low when program is aborted.
- """
- def __init__(self, in_pins: list=[], out_pins: list=[]):
- self.out_pins = out_pins
- self.in_pins = in_pins
- @abstractmethod
- def set_pin(self, pin: int, value: int) -> None:
- pass
-
- @abstractmethod
- def get_pin(self, pin: int) -> int:
- pass
- @abstractmethod
- def set_servo_angle(self, pin: int, angle: float) -> None:
- """
- Set the servo connected to 'pin' to the specified angle.
- """
- pass
- @abstractmethod
- def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None:
- """
- Move the servo to 'target_angle' at the specified speed (duration in ms).
- """
- pass
- def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True):
- """
- Perform a step operation on a stepper motor.
- """
- pass
-
- def cleanup(self):
- pass
- class PiGPIO(GPIOInterface):
- def __init__(self, in_pins: list=[], out_pins: list=[]):
- super().__init__(in_pins, out_pins)
- self.pi = pigpio.pi()
- if not self.pi.connected:
- raise RuntimeError("Could not connect to pigpiod. Is it running? Try: 'sudo systemctl start pigpiod'")
- for pin in out_pins:
- self.pi.set_mode(pin, pigpio.OUTPUT)
- self.pi.write(pin, 0) # Ensure output pins are off initially
- for pin in in_pins:
- self.pi.set_mode(pin, pigpio.INPUT)
- def set_pin(self, pin: int, value: int) -> None:
- if pin < 0 or pin > 27:
- return
- if pin not in self.out_pins:
- self.pi.set_mode(pin, pigpio.OUTPUT)
- self.out_pins.append(pin)
- self.pi.write(pin, value)
- def get_pin(self, pin: int) -> int:
- if pin < 0 or pin > 27:
- return -1
- if pin not in self.in_pins:
- self.pi.set_mode(pin, pigpio.INPUT)
- self.in_pins.append(pin)
- return self.pi.read(pin)
- def set_servo_angle(self, pin: int, angle_deg: float) -> None:
- """
- Set servo to a specific angle (0-180 degrees).
- MG90 typical pulse width: 0 deg = 500us, 180 deg = 2500us.
- """
- if pin not in self.out_pins:
- self.pi.set_mode(pin, pigpio.OUTPUT)
- self.out_pins.append(pin)
- # Clamp angle
- angle_deg = max(0, min(180, angle_deg))
- # Map angle to pulse width
- pulsewidth = int(500 + (angle_deg / 180.0) * 2000)
- self.pi.set_servo_pulsewidth(pin, pulsewidth)
- def set_servo_angle_smooth(self, pin: int, target_angle_deg: float, duration_ms: float) -> None:
- """
- Move servo to target_angle at given speed (duration in ms).
- """
- if pin < 0 or pin > 27 or duration_ms <= 0:
- return
- # Read current angle by assuming last set value (no feedback)
- # For simplicity, we store last angle in an instance dict
- if not hasattr(self, "_servo_angles"):
- self._servo_angles:dict[int, float] = {}
- if pin not in self._servo_angles:
- self.set_servo_angle(pin, target_angle_deg)
- return
- current_angle = self._servo_angles[pin]
- target_angle_deg = max(0, min(180, target_angle_deg))
- step = 1 if target_angle_deg > current_angle else -1
- delay = duration_ms / int(abs(target_angle_deg-current_angle)) / 1000
- for angle in range(int(current_angle), int(target_angle_deg), step):
- self.set_servo_angle(pin, angle)
- time.sleep(delay)
- self.set_servo_angle(pin, target_angle_deg)
- self._servo_angles[pin] = target_angle_deg
- def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True):
- """
- Perform a step operation on a stepper motor.
- dir_pin: Direction pin
- step_pin: Step pin to trigger steps
- steps: Number of steps to perform
- step_delay_us: Delay between steps in microseconds
- direction: True for forward, False for reverse
- """
- # Set direction
- self.set_pin(dir_pin, 1 if direction else 0)
- # Create pulse waveform
- pulses = []
- for _ in range(steps):
- pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us)) # STEP high
- pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us)) # STEP low
- self.pi.wave_clear()
- self.pi.wave_add_generic(pulses)
- wave_id = self.pi.wave_create()
- if wave_id >= 0:
- self.pi.wave_send_once(wave_id)
- while self.pi.wave_tx_busy():
- time.sleep(0.1)
- self.pi.wave_delete(wave_id)
- self.set_pin(dir_pin, 0)
- def cleanup(self):
- if self.pi.connected:
- for pin in self.out_pins:
- self.set_pin(pin, 0)
- self.pi.stop()
- class MockGPIO(GPIOInterface):
- def set_pin(self, pin: int, value: int) -> None:
- pass
- def get_pin(self, pin: int) -> int:
- return 0
-
- def set_servo_angle(self, pin: int, angle: float) -> None:
- pass
- def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None:
- pass
|