from pathlib import Path from typing import List, Dict, Tuple, Optional from pydantic import BaseModel, Field, model_validator, field_validator import yaml class SlotConfig(BaseModel): position: Tuple[float, float, float] occupied: bool = False slot_id: int device_id: Optional[str] = None cell_id: Optional[str] = None def __str__(self) -> str: return f"Slot {self.slot_id}, Device: {self.device_id}" class DeviceConfig(BaseModel): id: str position: Tuple[float, float, float] slots: List[SlotConfig] @field_validator('slots') @classmethod def validate_slots(cls, v): slot_ids = [slot.slot_id for slot in v] if len(slot_ids) != len(set(slot_ids)): raise ValueError("Duplicate slot IDs found") return v class FeederConfig(BaseModel): position: Tuple[float, float, float] approach_position: Tuple[float, float, float] class DropoffGradeConfig(BaseModel): id: str position: Tuple[float, float, float] capacity_threshold: float = Field(ge=0.0, le=1.0) class MQTTConfig(BaseModel): broker: str = "localhost" port: int = Field(default=1883, ge=1, le=65535) username: Optional[str] = None password: Optional[str] = None keepalive: int = Field(default=60, ge=0) class GRBLConfig(BaseModel): port: str = "debug" baudrate: int = Field(default=115200, ge=9600) class VisionConfig(BaseModel): camera_id: int = 0 resolution: Tuple[int, int] = (640, 480) frame_rate: int = Field(default=30, ge=1) exposure: float = Field(default=0.1, ge=0.0) gain: float = Field(default=1.0, ge=0.0) class VacuumConfig(BaseModel): min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0) max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0) max_pump_time_s: float = Field(default=30.0, ge=0.0) gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0) pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0) @model_validator(mode='after') def validate_pressure(self) -> 'VacuumConfig': if self.min_pressure_bar >= self.max_pressure_bar: raise ValueError("min_pressure must be less than max_pressure") return self class GPIOConfig(BaseModel): debug: bool = False pump_pin: int = Field(default=17, ge=0) valve_pin: int = Field(default=27, ge=0) class MovementConfig(BaseModel): speed: float = Field(default=400.0, ge=0.0) acceleration: float = Field(default=20.0, ge=0.0) safe_height: float = Field(default=25.0, ge=0.0) class LoggingConfig(BaseModel): level: str = Field(default="INFO") file_path: Path = Field(default=Path("logs/robot.log")) max_file_size_mb: int = Field(default=1, ge=0) backup_count: int = Field(default=3, ge=0) console_output: bool = True @field_validator('level') @classmethod def validate_log_level(cls, v): valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] if v.upper() not in valid_levels: raise ValueError(f"Log level must be one of {valid_levels}") return v.upper() class RobotConfig(BaseModel): measurement_devices: List[DeviceConfig] feeder: FeederConfig dropoff_grades: List[DropoffGradeConfig] mqtt: MQTTConfig grbl: GRBLConfig vision: VisionConfig vacuum: VacuumConfig gpio: GPIOConfig movement: MovementConfig logging: LoggingConfig @model_validator(mode='after') def validate_dropoff_grades(self) -> 'RobotConfig': # Sort grades by threshold in descending order for consistent behavior sorted_grades = sorted(self.dropoff_grades, key=lambda x: x.capacity_threshold, reverse=True) for i in range(1, len(sorted_grades)): if sorted_grades[i-1].capacity_threshold <= sorted_grades[i].capacity_threshold: raise ValueError( f"Dropoff grade thresholds must be in strictly descending order. Found: " f"Grade {sorted_grades[i-1].id} ({sorted_grades[i-1].capacity_threshold}) <= " f"Grade {sorted_grades[i].id} ({sorted_grades[i].capacity_threshold})" ) # Store sorted grades to ensure consistent behavior self.dropoff_grades = sorted_grades return self class ConfigParser: def __init__(self, config_path: str = "robot_control/config/config.yaml"): self.config_path = Path(config_path) self._config = self._load_config() def _load_config(self) -> RobotConfig: if not self.config_path.exists(): raise FileNotFoundError(f"Config file not found: {self.config_path}") with open(self.config_path, 'r') as f: config_dict = yaml.safe_load(f) # Set device_id for each slot before creating the config for device in config_dict['measurement_devices']: device_id = device['id'] for slot in device['slots']: slot['device_id'] = device_id return RobotConfig(**config_dict) @property def config(self) -> RobotConfig: return self._config