浏览代码

feat: cleaned up unnecessary codes, added gpio stepper pins initialization in integration_tets and main

Your Name 3 月之前
父节点
当前提交
2ee7d6747b

+ 1 - 1
main.py

@@ -21,7 +21,7 @@ class LoaderSystem:
         if gpio_config.debug:
             self.gpio = MockGPIO()
         else:
-            self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
+            self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin, gpio_config.mag_dist_pos_dir_pin, gpio_config.mag_dist_pos_step_pin, gpio_config.mag_dist_pos_en_pin])
 
         self.logger = logging.getLogger(__name__)
 

+ 1 - 1
playgrounds/camera_playground.py

@@ -7,7 +7,7 @@ def main():
     picam2.start()
 
 
-    x, y, w, h = [290, 140, 240, 170]
+    x, y, w, h = [100, 0, 260, 450]
 
     print("Type 's' to save a still image. Type 'q' to quit.")
     while True:

+ 0 - 0
playgrounds/camera_test.py


+ 43 - 0
playgrounds/datamatrix_playground.py

@@ -0,0 +1,43 @@
+from picamera2 import Picamera2
+import cv2
+from pylibdmtx.pylibdmtx import decode
+
+# Initialize camera
+picam2 = Picamera2()
+picam2.configure(picam2.create_preview_configuration(main={"format": "BGR888", "size": (640, 480)}))
+picam2.start()
+
+# Your configured ROI
+x, y, w, h = 100, 0, 260, 450  # replace with your new values
+
+
+print("Press Enter to capture and decode a frame. Ctrl+C to exit.")
+
+while True:
+    input("Capture frame now...")
+    
+    # Capture frame
+    frame = picam2.capture_array()
+    
+    # Crop to ROI
+    roi = frame[y:y+h, x:x+w]
+    
+    # Convert to grayscale + enhance contrast
+    gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
+    gray = cv2.equalizeHist(gray)
+    
+    # Optional: save the cropped ROI to check visually
+    cv2.imwrite("roi_test.jpg", gray)
+    
+    # Decode DataMatrix
+    decoded = decode(gray)
+    
+    if decoded:
+        print("DataMatrix detected!")
+        for d in decoded:
+            print("Decoded content:", d.data.decode('utf-8'))
+    else:
+        print("No DataMatrix detected in this frame.")
+
+picam2.stop()
+

+ 0 - 0
playgrounds/grbl_diagnostic.py


+ 14 - 1
playgrounds/integration_test.py

@@ -18,6 +18,19 @@ This is a test script for the loader system.
 It initializes the robot controller, vision system, and I2C handler,
 """
 
+# Configure root logger
+logging.basicConfig(
+    level=logging.INFO,  # show INFO and above (use DEBUG for more details)
+    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
+)
+
+# Make sure all existing loggers propagate to root
+for name, logger_obj in logging.Logger.manager.loggerDict.items():
+    if isinstance(logger_obj, logging.Logger):
+        logger_obj.setLevel(logging.INFO)
+        logger_obj.propagate = True
+
+        
 async def wait_for_enter():
     # Use asyncio.Event to coordinate between input and async code
     event = asyncio.Event()
@@ -40,7 +53,7 @@ class LoaderSystem:
         if gpio_config.debug:
             self.gpio = MockGPIO()
         else:
-            self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
+            self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin, gpio_config.mag_dist_pos_dir_pin, gpio_config.mag_dist_pos_step_pin, gpio_config.mag_dist_pos_en_pin])
 
         # Initialize vision system
         self.vision = DataMatrixReader(self.config.vision)

+ 28 - 5
playgrounds/mag_to_feeder_moving_sequence.py

@@ -5,6 +5,26 @@ from robot_control.src.utils.config import ConfigParser
 import asyncio
 import time
 import sys
+import pigpio
+
+# Initialize pigpio directly for stepper controlpython -m playgrounds.mag_to_feeder_moving_sequence
+pi = pigpio.pi()
+if not pi.connected:
+    print("Failed to connect to pigpio daemon")
+    sys.exit(1)
+
+# Stepper pins (BCM)
+DIR_PIN = 11
+STEP_PIN = 9
+EN_PIN = 10
+
+# Set up pins
+pi.set_mode(DIR_PIN, pigpio.OUTPUT)
+pi.set_mode(STEP_PIN, pigpio.OUTPUT)
+pi.set_mode(EN_PIN, pigpio.OUTPUT)
+
+# Enable motor driver (LOW = enabled)
+pi.write(EN_PIN, 0)
 
 # Assign two servo pins (update as needed)
 SERVO_PINS = {1: 6, 2: 13}  # Example: GPIO6 and GPIO13
@@ -35,14 +55,14 @@ def move_servo(servo_number, angle, speed):
         print(f"Error moving servo: {e}")
         return False
 
-def move_stepper(pos_mm, rot_deg, speed=7):  # Increased default from 100 to 300 mm/min
+def move_stepper(pos_mm, rot_deg, speed):  # Increased default from 100 to 300 mm/min
     """Move stepper motor using mag distributor"""
     print(f"M {pos_mm} {rot_deg} {speed if speed else ''} - Moving stepper to position {pos_mm}mm, rotation {rot_deg}° at {speed} mm/min")
     try:
         if speed is not None:
             # Convert speed to step delay if needed
             step_delay = mag_dist._speed_to_step_delay(speed)
-            mag_dist.move_mag_distributor_at_pos(pos_mm, rot_deg, step_delay)
+            mag_dist.move_mag_distributor_at_pos(pos_mm, rot_deg, 1000, step_delay)
         else:
             mag_dist.move_mag_distributor_at_pos(pos_mm, rot_deg)
         print("Stepper move complete.")
@@ -71,10 +91,10 @@ def execute_sequence():
     commands = [
         ("S", 1, 60, 1000),     # S 1 40 1000
         ("H",),                 # Homing sequence
-        ("M", 298, 0, 10),    # M -223 0 200 (increased speed from 100 to 200)
-        ("S", 1, 5, 250),       # S 1 0 250
+        ("M", 298, 60, 700),    # M -223 0 200 (increased speed from 100 to 200)
+        ("S", 1, 5, 200),       # S 1 0 250 
         ("S", 1, 60, 250),      # S 1 40 250
-        ("M", 46, 0, 10),     # M -46 0 150 (increased speed from 100 to 150)
+        ("M", 46, 60, 700),     # M -46 0 150 (increased speed from 100 to 150)
         ("S", 2, 15, 500),      # S 2 35 250
         ("S", 1, 153, 200),     # S 1 153 200
         ("S", 2, 88, 500),     # S 2 90 1000
@@ -145,6 +165,9 @@ def main():
     finally:
         # Cleanup
         try:
+            # Disable motor driver
+            pi.write(EN_PIN, 1)
+            pi.stop()
             gpio.cleanup()
             print("GPIO cleanup completed.")
         except:

+ 0 - 0
playgrounds/motor_pwm_test.py


+ 0 - 36
playgrounds/servo_direct.py

@@ -1,36 +0,0 @@
-import pigpio
-
-def main():
-
-    pi = pigpio.pi()
-    servo_pin = 13
-
-    print("Servo Playground")
-    print("Type an angle (0-180) to set the servo, or 'q' to quit.")
-
-    while True:
-        cmd = input("Enter angle: ").strip()
-        if cmd == "q":
-            print("Exiting.")
-            break
-
-        if not pi.connected:
-            raise RuntimeError(f"Failed to connect to pigpio daemon for pin {servo_pin}")
-        
-        try:
-            angle = float(cmd)
-            if not (0 <= angle <= 180):
-                print("Angle must be between 0 and 180.")
-                continue
-
-            # Map angle to pulse width
-            pulsewidth = int(500 + (angle / 180.0) * 2000)
-            pi.set_servo_pulsewidth(servo_pin, pulsewidth)
-
-            print(f"Set servo to {angle} degrees.")
-            
-        except Exception as e:
-            print(f"Invalid input: {e}")
-            
-if __name__ == "__main__":
-    main()

+ 0 - 0
playgrounds/test_arduino_port.py


+ 1 - 1
robot_control/config/config.yaml

@@ -101,7 +101,7 @@ vision:
   frame_rate: 30
   exposure: 0.1
   gain: 1.0
-  bbox: [290, 140, 240, 170]  # (x, y, width, height)
+  bbox: [170, 190, 240, 170]  # (x, y, width, height)
 
 vacuum:
   min_pressure_bar: 0.13

+ 12 - 8
robot_control/src/api/gpio.py

@@ -54,7 +54,7 @@ class PiGPIO(GPIOInterface):
             raise RuntimeError("Could not connect to pigpiod. Is it running? Try: 'sudo systemctl start pigpiod'")
         
         # Maximum steps per chunk to prevent connection reset
-        self.MAX_STEPS_PER_CHUNK = 2500 # Increased from 1000 to 2500
+        self.MAX_STEPS_PER_CHUNK =300# Increased from 1000 to 2500
 
         for pin in out_pins:
             self.pi.set_mode(pin, pigpio.OUTPUT)
@@ -144,7 +144,7 @@ class PiGPIO(GPIOInterface):
             direction: True for forward, False for reverse
         """
         #print(f"[CHUNKED] Moving {steps} steps, direction: {direction}, delay: {step_delay_us}us")
-        self.set_pin(en_pin, 0)
+        self.pi.write(en_pin, 0)
 
         # Set direction once at the beginning
         self.set_pin(dir_pin, 1 if direction else 0)
@@ -175,16 +175,20 @@ class PiGPIO(GPIOInterface):
                 
                 # Wait for waveform to complete
                 while self.pi.wave_tx_busy():
-                    time.sleep(0.01)  # Small sleep while waiting
-
+                    time.sleep(0.001)  # Small sleep while waiting REDUCED from 0.01 to 0.0001 (100μs)
                 self.pi.wave_delete(wave_id)
+            else:
+                print("Failed to create waveform!")
+                break
                 
                 # Small delay between chunks to prevent overwhelming pigpio
-                if remaining_steps > chunk_size:  # Not the last chunk
-                    time.sleep(0.001)  # 50ms delay between chunks
+                #if remaining_steps > chunk_size:  # Not the last chunk
+                    #time.sleep(0.001)  # 50ms delay between chunks
                     
             remaining_steps -= chunk_size
 
+        self.pi.write(en_pin, 1)
+
         # Clear direction pin
         self.set_pin(dir_pin, 0)
         #print(f"[CHUNKED] Movement complete: {steps} steps in {chunk_count} chunks")
@@ -216,7 +220,7 @@ class PiGPIO(GPIOInterface):
         
         # Convert speed from mm/min to step delay in microseconds
         steps_per_sec = speed_mmmin * steps_per_mm / 60.0
-        step_delay_us = max(10, int(1_000_000 / (2 * steps_per_sec)))  # *2 for high+low pulse
+        step_delay_us = max(10, int(5_000_0 / ( steps_per_sec)))  # *2 for high+low pulse
         
         print(f"Moving from {current_position_mm:.2f} mm to {target_position_mm:.2f} mm")
         print(f"  Distance: {delta_mm:.2f} mm = {steps_total} steps")
@@ -248,7 +252,7 @@ class PiGPIO(GPIOInterface):
         
         # Convert speed to step delay
         steps_per_sec = speed_mmmin * steps_per_mm / 60.0
-        step_delay_us = max(10, int(1_000_000 / (2 * steps_per_sec)))
+        step_delay_us = max(10, int(1_000_000 / (2* steps_per_sec)))
         
         max_steps = int(max_travel_mm * steps_per_mm)
         

+ 2 - 2
robot_control/src/robot/controller.py

@@ -465,8 +465,8 @@ class RobotController:
         """
         io_conf = self.config.gpio
         while not self.next_cell_id:
-            #self.next_cell_id = self.vision.read_datamatrix() # commented out for test purposes Y
-            self.next_cell_id = 1
+            self.next_cell_id = self.vision.read_datamatrix() # commented out for test purposes Y
+            #self.next_cell_id = 1
             if not self.next_cell_id:
                 logger.debug("No cell detected")
                 return False  # No cell detected

+ 2 - 2
robot_control/src/robot/mag_distributor.py

@@ -72,7 +72,7 @@ class MagDistributor:
             print(f"Servo {rot_servo_pin} not configured.")
             return False        
         try:
-            gpio.set_servo_angle_smooth(pot_servo_pin, angle)
+            gpio.set_servo_angle_smooth(rot_servo_pin, angle)
             print("Servo move complete.")
             return True
         except Exception as e:
@@ -92,7 +92,7 @@ class MagDistributor:
         self.curr_rot_deg = 0
         self.logger.info("Mag distributor homed successfully.")
 
-    async def _home_axis(self, en_pin, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=50):
+    async def _home_axis(self, en_pin, dir_pin, step_pin, endstop_pin, axis='linear', max_steps = 10000, step_delay=20):
         """
         Home a single axis by moving until the endstop is triggered.
         """

+ 1 - 1
robot_control/src/utils/config.py

@@ -88,7 +88,7 @@ class VisionConfig(BaseModel):
     frame_rate: int = Field(default=30, ge=1)
     exposure: float = Field(default=0.1, ge=0.0)
     gain: float = Field(default=1.0, ge=0.0)
-    bbox: Tuple[int, int, int, int] = Field(default=(0, 0, 640, 480)) # (x, y, width, height)
+    bbox: Tuple[int, int, int, int] = Field(default=(170, 190, 240, 170)) # (x, y, width, height)
 
     @model_validator(mode='after')
     def validate_bbox(self) -> 'VisionConfig':

+ 11 - 1
robot_control/src/vision/datamatrix.py

@@ -79,12 +79,22 @@ class DataMatrixReader:
 
         if not ret or frame is None or frame.size == 0:
             raise Exception("Failed to capture image")
+        
+        # DEBUG: added (save cropped bbox frame, overwrites each time)
+        cv2.imwrite("./captured_frame.png", frame)
+
+
         if self.bbox:
-            x, y, w, h = self.bbox
+            #x, y, w, h = self.bbox
+            x, y, w, h = 80, 0, 220, 450 #Added values for bbox here as i couldnt update config
             frame = frame[y:y+h, x:x+w]
             # Convert to grayscale, apply histogram equalization, blur, and adaptive thresholding
             frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
             frame = cv2.equalizeHist(frame)
+
+            # DEBUG: added (save cropped bbox frame)
+            cv2.imwrite("./captured_bbox.png", frame)
+
             # frame = cv2.medianBlur(frame, 3)
             # frame = cv2.adaptiveThreshold(frame, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
             #           cv2.THRESH_BINARY, 21, 2)

二进制
roi_test.jpg


二进制
still_image.jpg