pump_controller.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import time
  2. import pigpio
  3. from robot_control.src.utils.config import RobotConfig
  4. from robot_control.src.api.gpio import GPIOInterface
  5. class PumpController:
  6. def __init__(self, config:RobotConfig, gpio:GPIOInterface, logger):
  7. self.logger = logger
  8. self.pressure_threshold_on = config.vacuum.min_pressure_bar
  9. self.pressure_threshold_off = config.vacuum.max_pressure_bar
  10. self.pump_watchdog_timeout = config.vacuum.pump_watchdog_timeout_s
  11. self.gripping_threshold = config.vacuum.gripping_threshold_bar
  12. gpio_config = config.gpio
  13. self.pump_pin = gpio_config.pump_pin
  14. self.pump_last_activated = None
  15. self.pump_active = False
  16. self.gpio = gpio
  17. def check_endeffector_state(self, value) -> bool:
  18. return value < self.gripping_threshold
  19. def handle_tank_reading(self, value):
  20. current_time = time.time()
  21. # Check if pump needs to be deactivated due to watchdog timeout
  22. if self.pump_active and self.pump_last_activated and \
  23. (current_time - self.pump_last_activated > self.pump_watchdog_timeout):
  24. self.logger.warning("Pump deactivated due to watchdog timeout")
  25. self.gpio.set_pin(self.pump_pin, 0) # Turn off pump
  26. self.pump_active = False
  27. self.pump_last_activated = None
  28. return
  29. # Activate pump if pressure is above the threshold (lower value is more vacuum)
  30. if not self.pump_active and value > self.pressure_threshold_on:
  31. self.logger.info("Activating pump due to pressure threshold")
  32. self.gpio.set_pin(self.pump_pin, 1) # Turn on pump
  33. self.pump_active = True
  34. self.pump_last_activated = current_time
  35. # Deactivate pump if pressure is below the threshold
  36. elif self.pump_active and value < self.pressure_threshold_off:
  37. self.logger.info("Deactivating pump due to pressure threshold")
  38. self.gpio.set_pin(self.pump_pin, 0) # Turn off pump
  39. self.pump_active = False
  40. self.pump_last_activated = None
  41. def cleanup(self):
  42. self.pi.write(self.pump_pin, 0) # Ensure pump is off
  43. self.pi.stop() # Disconnect from pigpio daemon
  44. self.logger.info("Pigpio cleanup completed")