|
|
@@ -43,11 +43,19 @@ class GPIOInterface(ABC):
|
|
|
pass
|
|
|
|
|
|
class PiGPIO(GPIOInterface):
|
|
|
+ """
|
|
|
+ Enhanced PiGPIO implementation with chunked stepper motor control.
|
|
|
+ This prevents "Connection reset by peer" errors when doing large moves.
|
|
|
+ """
|
|
|
def __init__(self, in_pins: list=[], out_pins: list=[]):
|
|
|
super().__init__(in_pins, out_pins)
|
|
|
self.pi = pigpio.pi()
|
|
|
if not self.pi.connected:
|
|
|
raise RuntimeError("Could not connect to pigpiod. Is it running? Try: 'sudo systemctl start pigpiod'")
|
|
|
+
|
|
|
+ # Maximum steps per chunk to prevent connection reset
|
|
|
+ self.MAX_STEPS_PER_CHUNK = 2500 # Increased from 1000 to 2500
|
|
|
+
|
|
|
for pin in out_pins:
|
|
|
self.pi.set_mode(pin, pigpio.OUTPUT)
|
|
|
self.pi.write(pin, 0) # Ensure output pins are off initially
|
|
|
@@ -113,34 +121,171 @@ class PiGPIO(GPIOInterface):
|
|
|
|
|
|
def do_step(self, dir_pin: int, step_pin: int, steps: int = 100, step_delay_us: int = 200, direction: bool = True):
|
|
|
"""
|
|
|
- Perform a step operation on a stepper motor.
|
|
|
- dir_pin: Direction pin
|
|
|
- step_pin: Step pin to trigger steps
|
|
|
- steps: Number of steps to perform
|
|
|
- step_delay_us: Delay between steps in microseconds
|
|
|
- direction: True for forward, False for reverse
|
|
|
+ Perform a step operation on a stepper motor with chunking to prevent connection reset.
|
|
|
+
|
|
|
+ CHUNKING STRATEGY:
|
|
|
+ - Large movements (>MAX_STEPS_PER_CHUNK) are divided into smaller chunks
|
|
|
+ - Each chunk is sent as a separate waveform to prevent overwhelming pigpio
|
|
|
+ - Small delay between chunks allows pigpio to process commands
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dir_pin: Direction pin
|
|
|
+ step_pin: Step pin to trigger steps
|
|
|
+ steps: Total number of steps to perform
|
|
|
+ step_delay_us: Delay between steps in microseconds
|
|
|
+ direction: True for forward, False for reverse
|
|
|
"""
|
|
|
- # Set direction
|
|
|
+ #print(f"[CHUNKED] Moving {steps} steps, direction: {direction}, delay: {step_delay_us}us")
|
|
|
+
|
|
|
+ # Set direction once at the beginning
|
|
|
self.set_pin(dir_pin, 1 if direction else 0)
|
|
|
+
|
|
|
+ remaining_steps = steps
|
|
|
+ chunk_count = 0
|
|
|
+
|
|
|
+ while remaining_steps > 0:
|
|
|
+ # Determine chunk size
|
|
|
+ chunk_size = min(remaining_steps, self.MAX_STEPS_PER_CHUNK)
|
|
|
+ chunk_count += 1
|
|
|
+
|
|
|
+ print(f" Chunk {chunk_count}: {chunk_size} steps (remaining: {remaining_steps})")
|
|
|
+
|
|
|
+ # Create pulse waveform for this chunk
|
|
|
+ pulses = []
|
|
|
+ for _ in range(chunk_size):
|
|
|
+ pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us)) # STEP high
|
|
|
+ pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us)) # STEP low
|
|
|
|
|
|
- # Create pulse waveform
|
|
|
- pulses = []
|
|
|
- for _ in range(steps):
|
|
|
- pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us)) # STEP high
|
|
|
- pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us)) # STEP low
|
|
|
+ # Send chunk waveform
|
|
|
+ self.pi.wave_clear()
|
|
|
+ self.pi.wave_add_generic(pulses)
|
|
|
+ wave_id = self.pi.wave_create()
|
|
|
|
|
|
- self.pi.wave_clear()
|
|
|
- self.pi.wave_add_generic(pulses)
|
|
|
- wave_id = self.pi.wave_create()
|
|
|
+ if wave_id >= 0:
|
|
|
+ self.pi.wave_send_once(wave_id)
|
|
|
+
|
|
|
+ # Wait for waveform to complete
|
|
|
+ while self.pi.wave_tx_busy():
|
|
|
+ time.sleep(0.01) # Small sleep while waiting
|
|
|
+
|
|
|
+ self.pi.wave_delete(wave_id)
|
|
|
+
|
|
|
+ # Small delay between chunks to prevent overwhelming pigpio
|
|
|
+ if remaining_steps > chunk_size: # Not the last chunk
|
|
|
+ time.sleep(0.001) # 50ms delay between chunks
|
|
|
+
|
|
|
+ remaining_steps -= chunk_size
|
|
|
+
|
|
|
+ # Clear direction pin
|
|
|
+ self.set_pin(dir_pin, 0)
|
|
|
+ print(f"[CHUNKED] Movement complete: {steps} steps in {chunk_count} chunks")
|
|
|
|
|
|
- if wave_id >= 0:
|
|
|
- self.pi.wave_send_once(wave_id)
|
|
|
- while self.pi.wave_tx_busy():
|
|
|
- time.sleep(0.1)
|
|
|
+ def move_to_position_mm(self, dir_pin: int, step_pin: int, target_position_mm: float,
|
|
|
+ current_position_mm: float, steps_per_mm: float,
|
|
|
+ speed_mmmin: float = 1000) -> float:
|
|
|
+ """
|
|
|
+ Move motor to target position in mm using the chunked stepper control.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dir_pin: Direction pin
|
|
|
+ step_pin: Step pin
|
|
|
+ target_position_mm: Target position in millimeters
|
|
|
+ current_position_mm: Current position in millimeters
|
|
|
+ steps_per_mm: Steps per millimeter conversion factor
|
|
|
+ speed_mmmin: Speed in mm/min
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ float: New current position in mm
|
|
|
+ """
|
|
|
+ delta_mm = target_position_mm - current_position_mm
|
|
|
+ if abs(delta_mm) < 0.001: # Less than 1 micrometer
|
|
|
+ print(f"Already at target position {target_position_mm} mm")
|
|
|
+ return current_position_mm
|
|
|
+
|
|
|
+ steps_total = int(abs(delta_mm) * steps_per_mm)
|
|
|
+ direction = delta_mm > 0
|
|
|
+
|
|
|
+ # Convert speed from mm/min to step delay in microseconds
|
|
|
+ steps_per_sec = speed_mmmin * steps_per_mm / 60.0
|
|
|
+ step_delay_us = max(10, int(1_000_000 / (2 * steps_per_sec))) # *2 for high+low pulse
|
|
|
+
|
|
|
+ print(f"Moving from {current_position_mm:.2f} mm to {target_position_mm:.2f} mm")
|
|
|
+ print(f" Distance: {delta_mm:.2f} mm = {steps_total} steps")
|
|
|
+ print(f" Speed: {speed_mmmin} mm/min = {step_delay_us} µs delay")
|
|
|
+
|
|
|
+ # Use existing chunked do_step method
|
|
|
+ self.do_step(dir_pin, step_pin, steps_total, step_delay_us, direction)
|
|
|
+
|
|
|
+ return target_position_mm
|
|
|
+
|
|
|
+ def home_axis(self, dir_pin: int, step_pin: int, limit_pin: int,
|
|
|
+ speed_mmmin: float = 200, max_travel_mm: float = 200,
|
|
|
+ steps_per_mm: float = 80) -> bool:
|
|
|
+ """
|
|
|
+ Home an axis using limit switch with chunked movement.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dir_pin: Direction pin
|
|
|
+ step_pin: Step pin
|
|
|
+ limit_pin: Limit switch pin (active LOW)
|
|
|
+ speed_mmmin: Homing speed in mm/min
|
|
|
+ max_travel_mm: Maximum travel distance before giving up
|
|
|
+ steps_per_mm: Steps per millimeter conversion
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: True if homing successful, False if failed
|
|
|
+ """
|
|
|
+ print(f"Homing axis - limit pin {limit_pin}")
|
|
|
+
|
|
|
+ # Convert speed to step delay
|
|
|
+ steps_per_sec = speed_mmmin * steps_per_mm / 60.0
|
|
|
+ step_delay_us = max(10, int(1_000_000 / (2 * steps_per_sec)))
|
|
|
+
|
|
|
+ max_steps = int(max_travel_mm * steps_per_mm)
|
|
|
+
|
|
|
+ # Set direction (adjust based on your setup)
|
|
|
+ self.set_pin(dir_pin, 1) # Move toward limit switch
|
|
|
+
|
|
|
+ steps_moved = 0
|
|
|
+ chunk_size = 100 # Small chunks for homing
|
|
|
+
|
|
|
+ print(f"Moving toward limit switch (max {max_steps} steps)...")
|
|
|
+
|
|
|
+ while steps_moved < max_steps:
|
|
|
+ # Check limit switch before each chunk
|
|
|
+ if self.get_pin(limit_pin) == 0: # Active LOW limit switch
|
|
|
+ print(f"Limit switch triggered after {steps_moved} steps")
|
|
|
+ self.set_pin(dir_pin, 0) # Clear direction
|
|
|
+ return True
|
|
|
+
|
|
|
+ # Move a small chunk
|
|
|
+ remaining = min(chunk_size, max_steps - steps_moved)
|
|
|
+
|
|
|
+ # Create small waveform chunk for homing
|
|
|
+ pulses = []
|
|
|
+ for _ in range(remaining):
|
|
|
+ pulses.append(pigpio.pulse(1 << step_pin, 0, step_delay_us))
|
|
|
+ pulses.append(pigpio.pulse(0, 1 << step_pin, step_delay_us))
|
|
|
|
|
|
- self.pi.wave_delete(wave_id)
|
|
|
+ self.pi.wave_clear()
|
|
|
+ self.pi.wave_add_generic(pulses)
|
|
|
+ wave_id = self.pi.wave_create()
|
|
|
|
|
|
+ if wave_id >= 0:
|
|
|
+ self.pi.wave_send_once(wave_id)
|
|
|
+ while self.pi.wave_tx_busy():
|
|
|
+ time.sleep(0.001)
|
|
|
+ self.pi.wave_delete(wave_id)
|
|
|
+ else:
|
|
|
+ print("Failed to create homing waveform")
|
|
|
+ self.set_pin(dir_pin, 0)
|
|
|
+ return False
|
|
|
+
|
|
|
+ steps_moved += remaining
|
|
|
+
|
|
|
+ print(f"Homing failed - no limit switch found after {max_steps} steps")
|
|
|
self.set_pin(dir_pin, 0)
|
|
|
+ return False
|
|
|
|
|
|
def cleanup(self):
|
|
|
if self.pi.connected:
|