import time from robot_control.src.robot.mag_distributor import MagDistributor from robot_control.src.api.gpio import PiGPIO from robot_control.src.utils.config import ConfigParser import asyncio import time import sys # Assign two servo pins (update as needed) SERVO_PINS = {1: 6, 2: 13} # Example: GPIO6 and GPIO13 gpio = PiGPIO([], list(SERVO_PINS.values())) # Load config and create mag distributor for stepper control try: config_parser = ConfigParser() mag_dist = MagDistributor(config_parser.config, gpio) except Exception as e: print(f"Error: Could not initialize mag distributor: {e}") sys.exit(1) def move_servo(servo_number, angle, speed): """Move servo using PiGPIO's smooth function""" pin = SERVO_PINS.get(servo_number) if pin is None: print(f"Servo {servo_number} not configured.") return False print(f"S {servo_number} {angle} {speed} - Moving servo {servo_number} (GPIO {pin}) to {angle}° at speed {speed}") try: gpio.set_servo_angle_smooth(pin, angle, speed) print("Servo move complete.") return True except Exception as e: print(f"Error moving servo: {e}") return False def move_stepper(pos_mm, rot_deg, speed=7): # Increased default from 100 to 300 mm/min """Move stepper motor using mag distributor""" print(f"M {pos_mm} {rot_deg} {speed if speed else ''} - Moving stepper to position {pos_mm}mm, rotation {rot_deg}° at {speed} mm/min") try: if speed is not None: # Convert speed to step delay if needed step_delay = mag_dist._speed_to_step_delay(speed) mag_dist.move_mag_distributor_at_pos(pos_mm, rot_deg, step_delay) else: mag_dist.move_mag_distributor_at_pos(pos_mm, rot_deg) print("Stepper move complete.") return True except Exception as e: print(f"Error moving stepper: {e}") return False async def home_system(): """Home the mag distributor system""" print("H - Starting homing sequence...") try: await mag_dist.home() print("Homing completed successfully.") return True except Exception as e: print(f"Homing failed: {e}") return False def execute_sequence(): """Execute the predefined sequence of movements""" print("Starting Mag_to_Dist_to_Rob sequence...") print("=" * 50) # Define the sequence of commands (including homing at the start) commands = [ ("S", 1, 60, 1000), # S 1 40 1000 ("H",), # Homing sequence ("M", 298, 0, 10), # M -223 0 200 (increased speed from 100 to 200) ("S", 1, 5, 250), # S 1 0 250 ("S", 1, 60, 250), # S 1 40 250 ("M", 46, 0, 10), # M -46 0 150 (increased speed from 100 to 150) ("S", 2, 15, 500), # S 2 35 250 ("S", 1, 153, 200), # S 1 153 200 ("S", 2, 88, 500), # S 2 90 1000 ("S", 2, 130, 150), # S 2 120 150 ("S", 1, 60, 1000), # S 1 40 1000 # Defeeder pos -6 angle 120 ] success_count = 0 total_commands = len(commands) for i, command in enumerate(commands, 1): print(f"\nStep {i}/{total_commands}:") if command[0] == "H": # Homing command success = asyncio.run(home_system()) elif command[0] == "S": # Servo command _, servo_num, angle, speed = command success = move_servo(servo_num, angle, speed) elif command[0] == "M": # Motor/stepper command _, pos_mm, rot_deg, speed = command success = move_stepper(pos_mm, rot_deg, speed) else: print(f"Unknown command type: {command[0]}") success = False if success: success_count += 1 else: print(f"Command {i} failed!") response = input("Continue with next command? (y/n): ").strip().lower() if response != 'y': print("Sequence aborted by user.") break # Wait 1 second between commands (except for the last one) if i < total_commands: print("Waiting 1 second...") time.sleep(2) print("\n" + "=" * 50) print(f"Sequence completed: {success_count}/{total_commands} commands successful") return success_count == total_commands def main(): """Main function""" print("Mag_to_Dist_to_Rob - Automated Movement Sequence with Homing") print("This script will execute a predefined sequence starting with homing.") print("\nSequence includes:") print("1. Homing sequence (H)") print("2. Servo and stepper movements as specified") print("\nPress Enter to start the sequence, or Ctrl+C to exit...") try: input() success = execute_sequence() if success: print("\n✓ All movements completed successfully!") else: print("\n⚠ Some movements failed. Check the output above.") except KeyboardInterrupt: print("\n\nSequence interrupted by user.") except Exception as e: print(f"\nUnexpected error: {e}") finally: # Cleanup try: gpio.cleanup() print("GPIO cleanup completed.") except: pass if __name__ == "__main__": main()