| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- import time
- import pigpio
- from robot_control.src.utils.config import RobotConfig
- from robot_control.src.api.gpio import GPIOInterface
- class PumpController:
- def __init__(self, config:RobotConfig, gpio:GPIOInterface, logger):
- self.logger = logger
- self.pressure_threshold_on = config.vacuum.min_pressure_bar
- self.pressure_threshold_off = config.vacuum.max_pressure_bar
- self.pump_watchdog_timeout = config.vacuum.pump_watchdog_timeout_s
- self.gripping_threshold = config.vacuum.gripping_threshold_bar
- gpio_config = config.gpio
- self.pump_pin = gpio_config.pump_pin
- self.pump_last_activated = None
- self.pump_active = False
- self.gpio = gpio
- def check_endeffector_state(self, value) -> bool:
- return value < self.gripping_threshold
- def handle_tank_reading(self, value):
- current_time = time.time()
- # Check if pump needs to be deactivated due to watchdog timeout
- if self.pump_active and self.pump_last_activated and \
- (current_time - self.pump_last_activated > self.pump_watchdog_timeout):
- self.logger.warning("Pump deactivated due to watchdog timeout")
- self.gpio.set_pin(self.pump_pin, 0) # Turn off pump
- self.pump_active = False
- self.pump_last_activated = None
- return
- # Activate pump if pressure is above the threshold (lower value is more vacuum)
- if not self.pump_active and value > self.pressure_threshold_on:
- self.logger.info("Activating pump due to pressure threshold")
- self.gpio.set_pin(self.pump_pin, 1) # Turn on pump
- self.pump_active = True
- self.pump_last_activated = current_time
- # Deactivate pump if pressure is below the threshold
- elif self.pump_active and value < self.pressure_threshold_off:
- self.logger.info("Deactivating pump due to pressure threshold")
- self.gpio.set_pin(self.pump_pin, 0) # Turn off pump
- self.pump_active = False
- self.pump_last_activated = None
- def cleanup(self):
- self.pi.write(self.pump_pin, 0) # Ensure pump is off
- self.pi.stop() # Disconnect from pigpio daemon
- self.logger.info("Pigpio cleanup completed")
|