movement_playground.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import asyncio
  2. import time
  3. from dataclasses import dataclass
  4. from robot_control.src.robot.movement import RobotMovement
  5. from robot_control.src.api.grbl_handler import GRBLHandler
  6. from robot_control.src.utils.config import RobotConfig, GRBLConfig, MovementConfig, GPIOConfig, MQTTConfig
  7. from robot_control.src.api.gpio import PiGPIO
  8. # grbl_config = GRBLConfig(port="debug", baudrate=115200)
  9. grbl_config = GRBLConfig(port="/dev/ttyACM0", baudrate=115200)
  10. test_config = MovementConfig(
  11. speed=1000,
  12. safe_height=10,
  13. work_height=0
  14. )
  15. grbl_handler = GRBLHandler(
  16. grbl_config.port,
  17. grbl_config.baudrate
  18. )
  19. movement = RobotMovement(test_config, grbl_handler)
  20. pump_pin = 17
  21. valve_pin = 27
  22. gpio = PiGPIO([pump_pin, valve_pin], [])
  23. y_offset = 130
  24. positions = [
  25. (453, 1028 - y_offset, 0),
  26. (453, 1028 - y_offset, 107),
  27. (453, 1028 - y_offset, 0),
  28. (170, 760 - y_offset, 0),
  29. (170, 760 - y_offset, 40),
  30. (170, 760 - y_offset, 0),
  31. (453, 600 - y_offset, 0), # neutral pos
  32. ]
  33. async def wait_for_enter():
  34. # Use asyncio.Event to coordinate between input and async code
  35. event = asyncio.Event()
  36. def input_callback():
  37. input() # Wait for Enter
  38. event.set()
  39. # Run input in a separate thread to not block the event loop
  40. loop = asyncio.get_running_loop()
  41. await loop.run_in_executor(None, input_callback)
  42. await event.wait()
  43. async def perform_movement_sequence():
  44. for idx, pos in enumerate(positions):
  45. print(f"Moving to position {pos}...")
  46. if idx == 1:
  47. gpio.set_pin(pump_pin, 1)
  48. await asyncio.sleep(5)
  49. gpio.set_pin(pump_pin, 0)
  50. if idx == 2:
  51. gpio.set_pin(valve_pin, 1)
  52. await asyncio.sleep(0.5)
  53. if idx == 5:
  54. gpio.set_pin(valve_pin, 0)
  55. await asyncio.sleep(0.5)
  56. await movement.move_to_position(*pos)
  57. async def test_movement():
  58. await grbl_handler.connect()
  59. await movement.perform_homing()
  60. print("Press Enter to start movement sequence (or Ctrl+C to exit)...")
  61. while True:
  62. await wait_for_enter()
  63. print("Starting movement sequence...")
  64. await perform_movement_sequence()
  65. print("\nPress Enter to repeat sequence (or Ctrl+C to exit)...")
  66. if __name__ == "__main__":
  67. try:
  68. asyncio.run(test_movement())
  69. except KeyboardInterrupt:
  70. print("\nExiting...")