Forráskód Böngészése

ref: fix typing issues

Silas Gruen 6 hónapja
szülő
commit
58fc6471ab

+ 2 - 2
main.py

@@ -24,7 +24,7 @@ class LoaderSystem:
 
         self.logger = logging.getLogger(__name__)
         self.controller = RobotController(self.config, self.gpio)
-        self.vision = DataMatrixReader(self.config)
+        self.vision = DataMatrixReader(self.config.vision)
         self.logger.info("Initializing LoaderSystem")
         self.vision.initialize()
 
@@ -45,7 +45,7 @@ class LoaderSystem:
             )
         finally:
             self.cleanup()
-            logger.info("Cleaning up resources...")
+            self.logger.info("Cleaning up resources...")
 
     async def _poll_i2c_channels(self):
         while True:

+ 1 - 2
playgrounds/movement_playground.py

@@ -18,8 +18,7 @@ Can also use GPIO pins to control pump and valve.
 grbl_config = GRBLConfig(port="/dev/ttyACM0", baudrate=115200)
 test_config = MovementConfig(
         speed=1000,
-        safe_height=10,
-        work_height=0
+        safe_height=10
     )
 
 grbl_handler = GRBLHandler(

+ 3 - 0
robot_control/src/api/gpio.py

@@ -13,6 +13,9 @@ class GPIOInterface(ABC):
     @abstractmethod
     def get_pin(self, pin: int) -> int:
         pass
+    
+    def cleanup(self):
+        pass
 
 class PiGPIO(GPIOInterface):
     def __init__(self, in_pins: list=[], out_pins: list=[]):

+ 33 - 27
robot_control/src/api/grbl_handler.py

@@ -1,6 +1,6 @@
 import asyncio
 import serial_asyncio
-from typing import List
+from typing import List, Optional
 import logging
 import datetime
 import collections
@@ -21,8 +21,8 @@ class GRBLHandler:
         
         self.controller_active = asyncio.Event()
         
-        self.reader = None
-        self.writer = None
+        self.reader : Optional[asyncio.StreamReader] = None
+        self.writer : Optional[asyncio.StreamWriter] = None
         self.position_callbacks = []
 
     async def connect(self):
@@ -41,7 +41,7 @@ class GRBLHandler:
             raise serial_asyncio.serial.SerialException(f"Failed to connect to robot: {str(e)}")
 
         init_response = await self._process_response()
-        if not any(isinstance(response, collections.abc.Iterable) and "Grbl" in response for response in init_response):
+        if not any(isinstance(response, str) and "Grbl" in response for response in init_response):
             raise RuntimeError("Failed to connect to GRBL")
         
         logger.info("GRBL connected.")
@@ -87,6 +87,9 @@ class GRBLHandler:
         if self.debug:
             logger.debug(f"G-Code commands [{*commands,}] not sent in debug mode")
             return
+        if not self.writer:
+            logger.error("Writer is not initialized. Cannot send G-Code commands.")
+            return
 
         self.controller_active.set()
         try:
@@ -134,7 +137,7 @@ class GRBLHandler:
         
         return response_lines
 
-    async def wait_until_idle(self, timeout_s, position: list[float] = None):
+    async def wait_until_idle(self, timeout_s, position: Optional[tuple[float,float,float]] = None):
         """Wait until GRBL reports idle status
         
         Args:
@@ -167,7 +170,7 @@ class GRBLHandler:
                 
             await asyncio.sleep(0.5)  # Async delay to prevent flooding
 
-    async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: list[float] = None):
+    async def send_and_wait_gcode(self, commands: List[str], timeout_s=60, position: Optional[tuple[float,float,float]] = None):
         """Send GCODE commands and wait until machine is idle"""
         await self.send_gcode(commands)
         await asyncio.sleep(0.2)  # Delay to allow GRBL to process commands
@@ -184,30 +187,33 @@ class GRBLHandler:
             for callback in self.position_callbacks:
                 callback(self.current_position)
 
-    async def _process_response(self, timeout_s=6.0):
+    async def _process_response(self, timeout_s=6.0) -> list[str]:
         """Process GRBL responses"""
-        if self.reader:
-            response_lines = []
-            first = True
-            while True:
-                try:
-                    current_timeout = timeout_s if first else 0.1
-                    line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
-                    decoded = line.strip().decode("utf-8")
-                    self._check_pos_callbacks(decoded)
-                    logger.debug(f"G-Code response: {decoded}")
-                    response_lines.append(decoded)
-                    first = False
-                except asyncio.TimeoutError:
-                    # No more data available right now
-                    break
-                except Exception as e:
-                    raise RuntimeError(f"Failed to process response: {e}")
-            
-            if not response_lines:
-                logger.warning(f"No GRBL response received! ({timeout_s}s)")
+        response_lines : list[str] = []
 
+        if not self.reader:
             return response_lines
+        
+        first = True
+        while True:
+            try:
+                current_timeout = timeout_s if first else 0.1
+                line = await asyncio.wait_for(self.reader.readuntil(), timeout=current_timeout)
+                decoded = line.strip().decode("utf-8")
+                self._check_pos_callbacks(decoded)
+                logger.debug(f"G-Code response: {decoded}")
+                response_lines.append(decoded)
+                first = False
+            except asyncio.TimeoutError:
+                # No more data available right now
+                break
+            except Exception as e:
+                raise RuntimeError(f"Failed to process response: {e}")
+        
+        if not response_lines:
+            logger.warning(f"No GRBL response received! ({timeout_s}s)")
+
+        return response_lines
 
     def register_position_callback(self, callback):
         """Register callback for position updates

+ 4 - 3
robot_control/src/api/i2c_handler.py

@@ -1,6 +1,7 @@
 from abc import ABC, abstractmethod
 import asyncio
 import random
+from typing import Optional
 
 class I2CDevice(ABC):
     default_address = 0x08
@@ -17,15 +18,15 @@ class I2CDevice(ABC):
         pass
 
 class I2C:
-    def __init__(self, device_class:I2CDevice, address=None, **kwargs):
+    def __init__(self, device_class:type[I2CDevice], address=None, **kwargs):
         self.device_class = device_class
         self.address = address if address is not None else device_class.default_address
         assert self.address is not None, "I2C address must be provided"
         self.kwargs = kwargs
-        self.device = None
+        self.device: Optional[I2CDevice] = None
 
     def initialize(self):
-        self.device:I2CDevice = self.device_class()
+        self.device = self.device_class()
         self.device.initialize(self.address)
         
     async def read_channel(self, channel):

+ 17 - 12
robot_control/src/robot/controller.py

@@ -1,6 +1,6 @@
 from dataclasses import dataclass
 from enum import Enum
-from typing import List, Tuple
+from typing import List, Tuple, Optional
 from robot_control.src.utils.config import RobotConfig, SlotConfig, DeviceConfig, DropoffGradeConfig
 from robot_control.src.robot.movement import RobotMovement
 import logging
@@ -20,12 +20,12 @@ class CellStatus(Enum):
 class Cell:
     id: int
     status: CellStatus = CellStatus.WAITING
-    capacity: float = None
+    capacity: float = 0
 
 class RobotController:
     def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface):
         self.config = config
-        self.cells: dict[Cell] = {}
+        self.cells: dict[int, Cell] = {}
         self.devices = self.config.measurement_devices
         self.feeder = self.config.feeder
         self.dropoff_grades = self.config.dropoff_grades
@@ -104,30 +104,31 @@ class RobotController:
         await self.grbl_handler.close()
         self.mqtt_handler.cleanup()
 
-    def add_cell(self, cell_id: str):
+    def add_cell(self, cell_id: int) -> Cell:
         self.cells[cell_id] = Cell(cell_id)
         return self.cells[cell_id]
 
-    def get_cell_by_id(self, cell_id: str) -> Cell:
+    def get_cell_by_id(self, cell_id: int) -> Optional[Cell]:
         return self.cells.get(cell_id, None)
     
-    def get_device_by_id(self, device_id: str) -> DeviceConfig:
+    def get_device_by_id(self, device_id: str) -> Optional[DeviceConfig]:
         for device in self.devices:
             if device.id == device_id:
                 return device
         logger.error(f"Device {device_id} not found")
         return None
     
-    def get_slot_by_id(self, device_id: str, slot_id: int) -> Cell:
+    def get_slot_by_id(self, device_id: str, slot_id: int) -> Optional[SlotConfig]:
         try:
-            return self.get_device_by_id(device_id).slots[slot_id]
-        except AttributeError as e:
-            return None
+            device = self.get_device_by_id(device_id)
+            if not device:
+                return None
+            device.slots[slot_id]
         except IndexError as e:
             logger.error(f"Slot {slot_id} not found in device {device_id}")
             return None
 
-    def get_next_free_slot(self) -> SlotConfig:
+    def get_next_free_slot(self) -> Optional[SlotConfig]:
         for device in self.devices:
             for slot in device.slots:
                 if not slot.occupied:
@@ -221,9 +222,11 @@ class RobotController:
             raise RuntimeError("Gripper not occupied")
 
         # Move to slot position
+        if not slot.device_id:
+            raise RuntimeError(f"Device id not set!")
         slot_device = self.get_device_by_id(slot.device_id)
         if not slot_device:
-            raise RuntimeError(f"Device {slot.device_id} not found")
+            raise RuntimeError(f"Device {slot.device_id} not found!")
         pos = [sum(el) for el in zip(slot.position, slot_device.position)]
         safe_pos = (pos[0], pos[1], self.config.movement.safe_height)
         logger.info(f"Moving to slot position (safe) {safe_pos}...")
@@ -257,6 +260,8 @@ class RobotController:
             logger.error("Gripper already occupied")
             return None
         
+        if not slot.device_id:
+            raise RuntimeError(f"Device id not set!")            
         slot_device = self.get_device_by_id(slot.device_id)
         if not slot_device:
             raise RuntimeError(f"Device {slot.device_id} not found")

+ 1 - 1
robot_control/src/robot/movement.py

@@ -30,7 +30,7 @@ class RobotMovement:
         self.grbl = grbl
         self.grbl.register_position_callback(self._position_callback)
 
-    def _position_callback(self, pos: dict):
+    def _position_callback(self, pos: tuple[float, float, float]) -> None:
         """Update the current position from GRBL feedback.
 
         Args:

+ 1 - 1
robot_control/src/utils/config.py

@@ -8,7 +8,7 @@ class SlotConfig(BaseModel):
     occupied: bool = False
     slot_id: int
     device_id: Optional[str] = None
-    cell_id: Optional[str] = None
+    cell_id: Optional[int] = None
 
     def __str__(self) -> str:
         return f"Slot {self.slot_id}, Device: {self.device_id}"

+ 6 - 2
robot_control/src/vision/datamatrix.py

@@ -3,6 +3,7 @@ from pylibdmtx.pylibdmtx import decode
 import logging
 from pathlib import Path
 from robot_control.src.utils.config import VisionConfig
+from typing import Optional
 
 logger = logging.getLogger(__name__)
 
@@ -55,9 +56,12 @@ class DataMatrixReader:
         else:
             self.camera = cv2.VideoCapture(self.camera_id)
 
-    def read_datamatrix(self) -> str:
+    def read_datamatrix(self) -> Optional[str]:
+        if not self.camera:
+            raise Exception("No cam available")
+        
         ret, frame = self.camera.read()
-        if not ret:
+        if not ret or not frame:
             raise Exception("Failed to capture image")
         
         if self.bbox: