|
|
@@ -0,0 +1,204 @@
|
|
|
+import asyncio
|
|
|
+import logging
|
|
|
+import time
|
|
|
+from robot_control.src.utils.config import ConfigParser
|
|
|
+from robot_control.src.api.gpio import PiGPIO, MockGPIO
|
|
|
+from robot_control.src.api.i2c_handler import I2C, MockI2C
|
|
|
+from robot_control.src.vendor.mcp3428 import MCP3428
|
|
|
+
|
|
|
+# Configure logging
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
|
|
+)
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# Voltage conversion factor (from controller.py)
|
|
|
+VOLTAGE_CONVERSION_FACTOR = 5.1455
|
|
|
+
|
|
|
+class VoltageTest:
|
|
|
+ def __init__(self):
|
|
|
+ self.config = ConfigParser().config
|
|
|
+
|
|
|
+ # Initialize GPIO (include servo pin)
|
|
|
+ gpio_config = self.config.gpio
|
|
|
+ if gpio_config.debug:
|
|
|
+ self.gpio = MockGPIO()
|
|
|
+ logger.info("Using MockGPIO for testing")
|
|
|
+ else:
|
|
|
+ self.gpio = PiGPIO(out_pins=[gpio_config.measure_probe_oping, gpio_config.measure_servo_pin])
|
|
|
+ logger.info("Using real GPIO")
|
|
|
+
|
|
|
+ # Initialize I2C
|
|
|
+ i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
|
|
|
+ self.i2c = I2C(i2c_device_class)
|
|
|
+ self.i2c.initialize()
|
|
|
+ logger.info(f"I2C initialized with {i2c_device_class.__name__}")
|
|
|
+
|
|
|
+ self.io_conf = self.config.gpio
|
|
|
+
|
|
|
+ async def measure_cell_voltage(self) -> float:
|
|
|
+ """
|
|
|
+ Measure cell voltage by moving servo and deploying probe
|
|
|
+ """
|
|
|
+ if self.io_conf.measure_probe_oping < 0:
|
|
|
+ logger.warning("No probe pin configured, cannot measure voltage")
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Move servo to start position first
|
|
|
+ logger.info(f"Moving servo to start position ({self.io_conf.measure_servo_angle_start}°)...")
|
|
|
+ self.gpio.set_servo_angle_smooth(
|
|
|
+ self.io_conf.measure_servo_pin,
|
|
|
+ self.io_conf.measure_servo_angle_start,
|
|
|
+ 1000
|
|
|
+ )
|
|
|
+ await asyncio.sleep(1.0) # Wait for servo to reach position
|
|
|
+
|
|
|
+ # Move servo to probe position
|
|
|
+ logger.info(f"Moving servo to probe position ({self.io_conf.measure_servo_angle_probe}°)...")
|
|
|
+ self.gpio.set_servo_angle_smooth(
|
|
|
+ self.io_conf.measure_servo_pin,
|
|
|
+ self.io_conf.measure_servo_angle_probe,
|
|
|
+ 500
|
|
|
+ )
|
|
|
+ await asyncio.sleep(1.0) # Wait for servo to reach position
|
|
|
+
|
|
|
+ logger.info("Deploying measurement probe...")
|
|
|
+ self.gpio.set_pin(self.io_conf.measure_probe_oping, 1)
|
|
|
+ await asyncio.sleep(0.1) # Wait for probe to deploy
|
|
|
+
|
|
|
+ logger.info("Reading voltage from I2C channel 1...")
|
|
|
+ raw_value = await self.i2c.read_channel(1)
|
|
|
+ voltage = raw_value * VOLTAGE_CONVERSION_FACTOR
|
|
|
+
|
|
|
+ await asyncio.sleep(0.5)
|
|
|
+
|
|
|
+ logger.info("Retracting measurement probe...")
|
|
|
+ self.gpio.set_pin(self.io_conf.measure_probe_oping, 0)
|
|
|
+
|
|
|
+ # Move servo back to start position
|
|
|
+ logger.info(f"Moving servo back to start position ({self.io_conf.measure_servo_angle_start}°)...")
|
|
|
+ self.gpio.set_servo_angle_smooth(
|
|
|
+ self.io_conf.measure_servo_pin,
|
|
|
+ self.io_conf.measure_servo_angle_start,
|
|
|
+ 1000
|
|
|
+ )
|
|
|
+ await asyncio.sleep(1.0) # Wait for servo to return
|
|
|
+
|
|
|
+ logger.info(f"Raw I2C value: {raw_value}")
|
|
|
+ logger.info(f"Calculated voltage: {voltage:.3f}V")
|
|
|
+
|
|
|
+ return voltage
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error measuring voltage: {str(e)}")
|
|
|
+ # Make sure probe is retracted and servo is in start position on error
|
|
|
+ try:
|
|
|
+ self.gpio.set_pin(self.io_conf.measure_probe_oping, 0)
|
|
|
+ self.gpio.set_servo_angle_smooth(
|
|
|
+ self.io_conf.measure_servo_pin,
|
|
|
+ self.io_conf.measure_servo_angle_start,
|
|
|
+ 1000
|
|
|
+ )
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ def check_cell_voltage(self, voltage: float) -> bool:
|
|
|
+ """
|
|
|
+ Check if the cell voltage is within acceptable range
|
|
|
+ """
|
|
|
+ min_voltage = abs(self.config.feeder.min_voltage)
|
|
|
+
|
|
|
+ if voltage < min_voltage:
|
|
|
+ logger.warning(f"Cell voltage too low: {voltage:.3f}V < {min_voltage}V")
|
|
|
+ return False
|
|
|
+ if voltage < 0:
|
|
|
+ logger.warning(f"Cell has wrong polarity: {voltage:.3f}V")
|
|
|
+ return False
|
|
|
+
|
|
|
+ logger.info(f"Cell voltage is good: {voltage:.3f}V >= {min_voltage}V")
|
|
|
+ return True
|
|
|
+
|
|
|
+ async def continuous_voltage_test(self, interval: float = 2.0):
|
|
|
+ """
|
|
|
+ Continuously measure voltage at specified intervals
|
|
|
+ """
|
|
|
+ logger.info(f"Starting continuous voltage measurement (interval: {interval}s)")
|
|
|
+ logger.info("Press Ctrl+C to stop...")
|
|
|
+
|
|
|
+ try:
|
|
|
+ measurement_count = 0
|
|
|
+ while True:
|
|
|
+ measurement_count += 1
|
|
|
+ logger.info(f"\n--- Measurement #{measurement_count} ---")
|
|
|
+
|
|
|
+ voltage = await self.measure_cell_voltage()
|
|
|
+ is_good = self.check_cell_voltage(voltage)
|
|
|
+
|
|
|
+ status = "GOOD" if is_good else "BAD"
|
|
|
+ logger.info(f"Result: {voltage:.3f}V - Status: {status}")
|
|
|
+
|
|
|
+ await asyncio.sleep(interval)
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logger.info("\nStopping voltage test...")
|
|
|
+
|
|
|
+ async def single_voltage_test(self):
|
|
|
+ """
|
|
|
+ Perform a single voltage measurement
|
|
|
+ """
|
|
|
+ logger.info("Performing single voltage measurement...")
|
|
|
+
|
|
|
+ voltage = await self.measure_cell_voltage()
|
|
|
+ is_good = self.check_cell_voltage(voltage)
|
|
|
+
|
|
|
+ status = "GOOD" if is_good else "BAD"
|
|
|
+ logger.info(f"\nFinal Result: {voltage:.3f}V - Status: {status}")
|
|
|
+
|
|
|
+ return voltage, is_good
|
|
|
+
|
|
|
+ def cleanup(self):
|
|
|
+ """
|
|
|
+ Cleanup resources
|
|
|
+ """
|
|
|
+ logger.info("Cleaning up...")
|
|
|
+ self.gpio.cleanup()
|
|
|
+
|
|
|
+async def main():
|
|
|
+ voltage_test = VoltageTest()
|
|
|
+
|
|
|
+ try:
|
|
|
+ print("\nVoltage Test Options:")
|
|
|
+ print("1. Single measurement")
|
|
|
+ print("2. Continuous measurement (every 2 seconds)")
|
|
|
+ print("3. Custom interval continuous measurement")
|
|
|
+
|
|
|
+ choice = input("\nEnter your choice (1-3): ").strip()
|
|
|
+
|
|
|
+ if choice == "1":
|
|
|
+ await voltage_test.single_voltage_test()
|
|
|
+ elif choice == "2":
|
|
|
+ await voltage_test.continuous_voltage_test(2.0)
|
|
|
+ elif choice == "3":
|
|
|
+ try:
|
|
|
+ interval = float(input("Enter measurement interval in seconds: "))
|
|
|
+ await voltage_test.continuous_voltage_test(interval)
|
|
|
+ except ValueError:
|
|
|
+ logger.error("Invalid interval, using default 2 seconds")
|
|
|
+ await voltage_test.continuous_voltage_test(2.0)
|
|
|
+ else:
|
|
|
+ logger.error("Invalid choice, performing single measurement")
|
|
|
+ await voltage_test.single_voltage_test()
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logger.info("Test interrupted by user")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Test failed: {str(e)}")
|
|
|
+ finally:
|
|
|
+ voltage_test.cleanup()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ asyncio.run(main())
|