瀏覽代碼

ref: update image files in tests; move smbus dep to vendor; add mock i2c; improve logging and configuration handling; add bbox to vision

Silas Gruen 8 月之前
父節點
當前提交
02ad8c01d4

+ 21 - 12
main.py

@@ -3,25 +3,36 @@ import logging
 from robot_control.src.robot.controller import RobotController
 from robot_control.src.utils.config import ConfigParser
 from robot_control.src.vision.datamatrix import DataMatrixReader
-from robot_control.src.api.i2c_handler import I2C
+from robot_control.src.api.i2c_handler import I2C, MockI2C
 from robot_control.src.vendor.mcp3428 import MCP3428
 from robot_control.src.robot.pump_controller import PumpController
-from robot_control.src.api.gpio import PiGPIO
+from robot_control.src.api.gpio import PiGPIO, MockGPIO
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
+)
 
 class LoaderSystem:
     def __init__(self):
         self.config = ConfigParser().config
         gpio_config = self.config.gpio
-        self.gpio = PiGPIO([gpio_config.pump_pin, gpio_config.valve_pin], [])
+        if gpio_config.debug:
+            self.gpio = MockGPIO()
+        else:
+            self.gpio = PiGPIO([gpio_config.pump_pin, gpio_config.valve_pin], [])
 
         self.logger = logging.getLogger(__name__)
         self.controller = RobotController(self.config, self.gpio)
-        self.vision = DataMatrixReader(self.config.vision.camera_id)
+        self.vision = DataMatrixReader(self.config)
         self.logger.info("Initializing LoaderSystem")
         self.vision.initialize()
-        self.i2c = I2C(MCP3428)
+
+        # Use mock I2C device if debug is enabled
+        i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
+        self.i2c = I2C(i2c_device_class)
         self.i2c.initialize()
-        self.logger.info("I2C initialized for MCP3428")
+        self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
 
         self.pump_controller = PumpController(self.config, self.gpio, self.logger)
 
@@ -38,15 +49,12 @@ class LoaderSystem:
             try:
                 readings = await self.i2c.read_channels([1, 3, 4])
                 for channel, value in readings.items():
-                    self.logger.info(f"Channel {channel} reading: {value}")
+                    self.logger.debug(f"Channel {channel} reading: {value}")
                     if channel == 3:  # Pressure reading
                         self.pump_controller.handle_tank_reading(value)
                     if channel == 4:
                         state = self.pump_controller.check_endeffector_state(value)
                         self.controller.set_suction_state(state)
-                    # Perform a generic task based on the reading
-                    if value > 0.5:  # Example condition
-                        self.logger.warning(f"Value {value} on channel {channel} exceeds threshold")
             except Exception as e:
                 self.logger.error(f"Error polling I2C channels: {str(e)}")
             await asyncio.sleep(1)  # Poll every second
@@ -61,10 +69,11 @@ class LoaderSystem:
                 if not slot:
                     break
                 # Pick and place new cell
-                cell_id = self.vision.read_datamatrix()
-                if not cell_id:
+                cell_id_str = self.vision.read_datamatrix()
+                if not cell_id_str:
                     self.logger.debug("No cell detected")
                     break
+                cell_id = int(cell_id_str)
                 self.logger.info(f"Processing cell {cell_id}")
                 cell = self.controller.add_cell(cell_id)
                 try:

+ 9 - 5
robot_control/config/config.yaml

@@ -50,24 +50,28 @@ grbl:
   baudrate: 115200
 
 vision:
-  camera_id: -1
+  camera_id: 0
   resolution: [640, 480]
   frame_rate: 30
   exposure: 0.1
   gain: 1.0
+  bbox: [100, 100,  200, 200]  # (x, y, width, height)
 
 vacuum:
-  min_pressure_bar: 0.5
-  max_pressure_bar: 1.0
+  min_pressure_bar: -0.85
+  max_pressure_bar: -0.35
   max_pump_time_s: 30
-  gripping_threshold_bar: 0.8
+  gripping_threshold_bar: -0.35
   pump_watchdog_timeout_s: 30
 
 gpio:
-  debug: false
+  debug: true
   pump_pin: 17
   valve_pin: 27
 
+i2c:
+  debug: true
+
 movement:
   speed: 400.0
   acceleration: 20.0

+ 27 - 5
robot_control/src/api/i2c_handler.py

@@ -1,11 +1,11 @@
-import smbus2 as smbus 
 from abc import ABC, abstractmethod
-from time import sleep
 import asyncio
+import random
 
 class I2CDevice(ABC):
+    default_address = 0x08
     @abstractmethod
-    def initialize(self, bus: smbus.SMBus, address: int):
+    def initialize(self, address: int):
         pass
 
     @abstractmethod
@@ -25,9 +25,8 @@ class I2C:
         self.device = None
 
     def initialize(self):
-        bus = smbus.SMBus(1)
         self.device:I2CDevice = self.device_class()
-        self.device.initialize(bus, self.address)
+        self.device.initialize(self.address)
         
     async def read_channel(self, channel):
         if self.device is None:
@@ -41,6 +40,29 @@ class I2C:
         
         # Needs to be sequential otherwise readings will be mixedup
         return {channel: await self.read_channel(channel) for channel in self.channels}
+    
+class MockI2C(I2CDevice):
+    def __init__(self):
+        self.channels = {
+            1: 0.3,  # Generic sensor
+            3: 0.8,  # Tank sensor
+            4: 0.8   # Endeffector sensor
+        }
+        self.current_channel = 1
+
+    def initialize(self, address):
+        pass
+
+    def config_channel(self, channel: int, gain=1, resolution=12):
+        if self.current_channel not in self.channels:
+            raise ValueError(f"Invalid channel {channel}")
+        self.current_channel = channel
+
+    def read_value(self) -> float:        
+        # Add some noise to the readings
+        base_value = self.channels[self.current_channel]
+        noise = random.uniform(-0.05, 0.05)
+        return base_value + noise
 
 
 

+ 1 - 1
robot_control/src/robot/controller.py

@@ -131,7 +131,7 @@ class RobotController:
     
     async def process_finished_measurement(self):
         if not self.work_queue:
-            logger.warning("No finished measurements in queue")
+            logger.info("No finished measurements in queue")
             return
         
         measurement_result = self.work_queue.pop(0)

+ 9 - 3
robot_control/src/robot/pump_controller.py

@@ -17,10 +17,16 @@ class PumpController:
         self.pump_active = False
         self.gpio = gpio
 
+    def _convert_to_bar(self, value: float) -> float:
+        """Convert sensor value (0-1.6) to bar (-1 to 1)"""
+        return (value - 0.8) * 2.5
+
     def check_endeffector_state(self, value) -> bool:
-        return value < self.gripping_threshold        
+        pressure_bar = self._convert_to_bar(value)
+        return pressure_bar < self.gripping_threshold        
 
     def handle_tank_reading(self, value):
+        pressure_bar = self._convert_to_bar(value)
         current_time = time.time()
 
         # Check if pump needs to be deactivated due to watchdog timeout
@@ -33,14 +39,14 @@ class PumpController:
             return
 
         # Activate pump if pressure is above the threshold (lower value is more vacuum)
-        if not self.pump_active and value > self.pressure_threshold_on:
+        if not self.pump_active and pressure_bar > self.pressure_threshold_on:
             self.logger.info("Activating pump due to pressure threshold")
             self.gpio.set_pin(self.pump_pin, 1)  # Turn on pump
             self.pump_active = True
             self.pump_last_activated = current_time
 
         # Deactivate pump if pressure is below the threshold
-        elif self.pump_active and value < self.pressure_threshold_off:
+        elif self.pump_active and pressure_bar < self.pressure_threshold_off:
             self.logger.info("Deactivating pump due to pressure threshold")
             self.gpio.set_pin(self.pump_pin, 0)  # Turn off pump
             self.pump_active = False

+ 13 - 0
robot_control/src/utils/config.py

@@ -52,6 +52,15 @@ class VisionConfig(BaseModel):
     frame_rate: int = Field(default=30, ge=1)
     exposure: float = Field(default=0.1, ge=0.0)
     gain: float = Field(default=1.0, ge=0.0)
+    bbox: Tuple[int, int, int, int] = Field(default=(0, 0, 640, 480)) # (x, y, width, height)
+
+    @model_validator(mode='after')
+    def validate_bbox(self) -> 'VisionConfig':
+        if self.bbox[0] + self.bbox[2] > self.resolution[0]:
+            raise ValueError("Bounding box exceeds image width")
+        if self.bbox[1] + self.bbox[3] > self.resolution[1]:
+            raise ValueError("Bounding box exceeds image height")
+        return self
 
 class VacuumConfig(BaseModel):
     min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
@@ -71,6 +80,9 @@ class GPIOConfig(BaseModel):
     pump_pin: int = Field(default=17, ge=0)
     valve_pin: int = Field(default=27, ge=0)
 
+class I2CConfig(BaseModel):
+    debug: bool = False
+
 class MovementConfig(BaseModel):
     speed: float = Field(default=400.0, ge=0.0)
     acceleration: float = Field(default=20.0, ge=0.0)
@@ -100,6 +112,7 @@ class RobotConfig(BaseModel):
     vision: VisionConfig
     vacuum: VacuumConfig
     gpio: GPIOConfig
+    i2c: I2CConfig
     movement: MovementConfig
     logging: LoggingConfig
 

+ 5 - 0
robot_control/src/utils/logging.py

@@ -7,6 +7,11 @@ import time
 
 THROTTLING_FILTER_ENABLED = False
 
+# TODO [SG]: Currently not used due to multiple problems:
+# No individual logger naming for each module
+# Throttling filtered too much
+# Coul not be created without config
+
 class LoggerSingleton:
     _instance: Optional[logging.Logger] = None
 

+ 1 - 1
robot_control/src/vendor/mcp3428.py

@@ -33,7 +33,7 @@ class MCP3428(I2CDevice):
     default_address = MCP3428_DEFAULT_ADDRESS
 
     def initialize(self, bus: smbus.SMBus, address: int):
-        self.bus = bus
+        self.bus = smbus.SMBus(1)
         self.address = address
 
     def config_channel(self, channel: int, gain=1, resolution=12):

+ 11 - 4
robot_control/src/vision/datamatrix.py

@@ -2,6 +2,7 @@ import cv2
 from pylibdmtx.pylibdmtx import decode
 import logging
 from pathlib import Path
+from robot_control.src.utils.config import VisionConfig
 
 logger = logging.getLogger(__name__)
 
@@ -42,9 +43,10 @@ class TestCamera:
         pass
 
 class DataMatrixReader:
-    def __init__(self, camera_id: int = 0):
-        self.camera_id = camera_id
+    def __init__(self, config: VisionConfig):
+        self.camera_id = config.camera_id
         self.camera = None
+        self.bbox = config.bbox
 
     def initialize(self):
         if self.camera_id < 0:
@@ -58,12 +60,17 @@ class DataMatrixReader:
         if not ret:
             raise Exception("Failed to capture image")
         
+        if self.bbox:
+            x, y, w, h = self.bbox
+            frame = frame[y:y+h, x:x+w]
+        
         decoded = decode(frame)
         if not decoded:
-            logger.warning("No datamatrix found")
+            logger.info("No datamatrix found")
             return None
             
-        return decoded[0].data.decode('utf-8')
+        decoded_str = decoded[0].data.decode('utf-8')
+        return decoded_str
 
     def cleanup(self):
         if self.camera:

二進制
tests/files/test_matrix.jpg


二進制
tests/files/test_matrix.png


二進制
tests/files/test_matrix_fail.jpg


二進制
tests/files/test_matrix_fail.png


+ 29 - 5
tests/test_vision.py

@@ -1,12 +1,36 @@
 import pytest
 from robot_control.src.vision.datamatrix import DataMatrixReader
+from robot_control.src.utils.config import VisionConfig
 
 @pytest.fixture
-def datamatrix_reader():
-    return DataMatrixReader(-1)
+def vision_config_mock():
+    return VisionConfig(
+        camera_id=-1,
+        resolution=(640, 480),
+        frame_rate=30,
+        exposure=100,
+        gain=1.0,
+        bbox=(0, 0, 640, 480)
+    )
+
+@pytest.fixture
+def vision_config_real():
+    return VisionConfig(
+        camera_id=0,
+        resolution=(640, 480),
+        frame_rate=30,
+        exposure=100,
+        gain=1.0,
+        bbox=(0, 0, 640, 480)
+    )
+
+@pytest.fixture
+def datamatrix_reader(vision_config_mock):
+    return DataMatrixReader(vision_config_mock)
+
 @pytest.fixture
-def datamatrix_reader_realcam():
-    return DataMatrixReader()
+def datamatrix_reader_realcam(vision_config_real):
+    return DataMatrixReader(vision_config_real)
 
 class TestDataMatrixReader:
     def test_init_reader(self, datamatrix_reader_realcam: DataMatrixReader):
@@ -15,7 +39,7 @@ class TestDataMatrixReader:
     def test_read_success(self, datamatrix_reader: DataMatrixReader):
         datamatrix_reader.initialize()
         result = datamatrix_reader.read_datamatrix()
-        assert result == "https://batteries.up-cell.de/cells/2224"
+        assert result == "60"
 
     def test_read_failure(self, datamatrix_reader: DataMatrixReader):
         datamatrix_reader.initialize()