gpio.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from abc import ABC, abstractmethod
  2. import pigpio
  3. import time
  4. class GPIOInterface(ABC):
  5. """
  6. Interface for handling communication to the GPIO pins.
  7. out_pins are set low when program is aborted.
  8. """
  9. def __init__(self, in_pins: list=[], out_pins: list=[]):
  10. self.out_pins = out_pins
  11. self.in_pins = in_pins
  12. @abstractmethod
  13. def set_pin(self, pin: int, value: int) -> None:
  14. pass
  15. @abstractmethod
  16. def get_pin(self, pin: int) -> int:
  17. pass
  18. @abstractmethod
  19. def set_servo_angle(self, pin: int, angle: float) -> None:
  20. """
  21. Set the servo connected to 'pin' to the specified angle.
  22. """
  23. pass
  24. @abstractmethod
  25. def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None:
  26. """
  27. Move the servo to 'target_angle' at the specified speed (duration in ms).
  28. """
  29. pass
  30. def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True):
  31. """
  32. Perform a step operation on a stepper motor.
  33. """
  34. pass
  35. def cleanup(self):
  36. pass
  37. class PiGPIO(GPIOInterface):
  38. def __init__(self, in_pins: list=[], out_pins: list=[]):
  39. super().__init__(in_pins, out_pins)
  40. self.pi = pigpio.pi()
  41. if not self.pi.connected:
  42. raise RuntimeError("Could not connect to pigpiod. Is it running? Try: 'sudo systemctl start pigpiod'")
  43. for pin in out_pins:
  44. self.pi.set_mode(pin, pigpio.OUTPUT)
  45. self.pi.write(pin, 0) # Ensure output pins are off initially
  46. for pin in in_pins:
  47. self.pi.set_mode(pin, pigpio.INPUT)
  48. def set_pin(self, pin: int, value: int) -> None:
  49. if pin < 0 or pin > 27:
  50. return
  51. if pin not in self.out_pins:
  52. self.pi.set_mode(pin, pigpio.OUTPUT)
  53. self.out_pins.append(pin)
  54. self.pi.write(pin, value)
  55. def get_pin(self, pin: int) -> int:
  56. if pin < 0 or pin > 27:
  57. return -1
  58. if pin not in self.in_pins:
  59. self.pi.set_mode(pin, pigpio.INPUT)
  60. self.in_pins.append(pin)
  61. return self.pi.read(pin)
  62. def set_servo_angle(self, pin: int, angle_deg: float) -> None:
  63. """
  64. Set servo to a specific angle (0-180 degrees).
  65. MG90 typical pulse width: 0 deg = 500us, 180 deg = 2500us.
  66. """
  67. if pin not in self.out_pins:
  68. self.pi.set_mode(pin, pigpio.OUTPUT)
  69. self.out_pins.append(pin)
  70. # Clamp angle
  71. angle_deg = max(0, min(180, angle_deg))
  72. # Map angle to pulse width
  73. pulsewidth = int(500 + (angle_deg / 180.0) * 2000)
  74. self.pi.set_servo_pulsewidth(pin, pulsewidth)
  75. def set_servo_angle_smooth(self, pin: int, target_angle_deg: float, duration_ms: float) -> None:
  76. """
  77. Move servo to target_angle at given speed (duration in ms).
  78. """
  79. if pin < 0 or pin > 27 or duration_ms <= 0:
  80. return
  81. # Read current angle by assuming last set value (no feedback)
  82. # For simplicity, we store last angle in an instance dict
  83. if not hasattr(self, "_servo_angles"):
  84. self._servo_angles:dict[int, float] = {}
  85. if pin not in self._servo_angles:
  86. self.set_servo_angle(pin, target_angle_deg)
  87. return
  88. current_angle = self._servo_angles[pin]
  89. target_angle_deg = max(0, min(180, target_angle_deg))
  90. step = 1 if target_angle_deg > current_angle else -1
  91. delay = duration_ms / int(abs(target_angle_deg-current_angle)) / 1000
  92. for angle in range(int(current_angle), int(target_angle_deg), step):
  93. self.set_servo_angle(pin, angle)
  94. time.sleep(delay)
  95. self.set_servo_angle(pin, target_angle_deg)
  96. self._servo_angles[pin] = target_angle_deg
  97. def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True):
  98. """
  99. Perform a step operation on a stepper motor.
  100. dir_pin: Direction pin
  101. step_pin: Step pin to trigger steps
  102. steps: Number of steps to perform
  103. step_delay_us: Delay between steps in microseconds
  104. direction: True for forward, False for reverse
  105. """
  106. # Set direction
  107. self.set_pin(dir_pin, 1 if direction else 0)
  108. # Create pulse waveform
  109. pulses = []
  110. for _ in range(steps):
  111. pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us)) # STEP high
  112. pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us)) # STEP low
  113. self.pi.wave_clear()
  114. self.pi.wave_add_generic(pulses)
  115. wave_id = self.pi.wave_create()
  116. if wave_id >= 0:
  117. self.pi.wave_send_once(wave_id)
  118. while self.pi.wave_tx_busy():
  119. time.sleep(0.1)
  120. self.pi.wave_delete(wave_id)
  121. self.set_pin(dir_pin, 0)
  122. def cleanup(self):
  123. if self.pi.connected:
  124. for pin in self.out_pins:
  125. self.set_pin(pin, 0)
  126. self.pi.stop()
  127. class MockGPIO(GPIOInterface):
  128. def set_pin(self, pin: int, value: int) -> None:
  129. pass
  130. def get_pin(self, pin: int) -> int:
  131. return 0
  132. def set_servo_angle(self, pin: int, angle: float) -> None:
  133. pass
  134. def set_servo_angle_smooth(self, pin: int, target_angle: float, duration_ms: float) -> None:
  135. pass