from pathlib import Path from typing import List, Dict, Tuple, Optional from pydantic import BaseModel, Field, model_validator, field_validator import yaml # This module defines the configuration schema for the robot control system. # It uses Pydantic models to validate and structure all configuration options, # including hardware pin assignments, device positions, magazine layouts, and # operational parameters. The configuration is loaded from a YAML file and # provides a single source of truth for all system components. class SlotConfig(BaseModel): position: Tuple[float, float, float] occupied: bool = False slot_id: int device_id: Optional[str] = None cell_id: Optional[int] = None def __str__(self) -> str: return f"Slot {self.slot_id}, Device: {self.device_id}" class DeviceConfig(BaseModel): id: str position: Tuple[float, float, float] slots: List[SlotConfig] @field_validator('slots') @classmethod def validate_slots(cls, v): slot_ids = [slot.slot_id for slot in v] if len(slot_ids) != len(set(slot_ids)): raise ValueError("Duplicate slot IDs found") return v class MagDistPosition(BaseModel): pos_mm: float = Field(default=0.0, ge=0, le=700) rot_deg: float = Field(default=0.0, ge=-180.0, le=180.0) class MagazineConfig(BaseModel): mag_pos: MagDistPosition max_num_cells: int = Field(default=10, ge=0) class DefeederMagazineConfig(MagazineConfig): name: str health_range: Tuple[int, int] = Field(default=(0, 100)) @field_validator('health_range') @classmethod def validate_health_range(cls, v): min_val = 0 max_val = 100 if not (min_val <= v[0] < v[1] <= max_val): raise ValueError(f"health_range must be a tuple (min, max) with {min_val} <= min < max <= {max_val}") return v class FeederConfig(BaseModel): robot_pos: Tuple[float, float, float] mag_pos: MagDistPosition min_voltage: float = Field(default=2.0, ge=0.0) max_num_cells: int = Field(default=10, ge=0) class DefeederConfig(BaseModel): robot_pos: Tuple[float, float, float] mag_pos: MagDistPosition max_num_cells: int = Field(default=10, ge=0) class MagDistributorConfig(BaseModel): debug: Optional[bool] = False max_speed_mmmin: int = Field(default=100, ge=0) home_speed_mmmin: int = Field(default=50, ge=0) length_mm: float = Field(default=800.0, ge=0.0) class MQTTConfig(BaseModel): broker: str = "localhost" port: int = Field(default=1883, ge=1, le=65535) username: Optional[str] = None password: Optional[str] = None keepalive: int = Field(default=60, ge=0) class GRBLConfig(BaseModel): port: str = "debug" baudrate: int = Field(default=115200, ge=9600) class VisionConfig(BaseModel): camera_id: int = 0 resolution: Tuple[int, int] = (640, 480) frame_rate: int = Field(default=30, ge=1) exposure: float = Field(default=0.1, ge=0.0) gain: float = Field(default=1.0, ge=0.0) bbox: Tuple[int, int, int, int] = Field(default=(0, 0, 640, 480)) # (x, y, width, height) @model_validator(mode='after') def validate_bbox(self) -> 'VisionConfig': if self.bbox[0] + self.bbox[2] > self.resolution[0]: raise ValueError("Bounding box exceeds image width") if self.bbox[1] + self.bbox[3] > self.resolution[1]: raise ValueError("Bounding box exceeds image height") return self class VacuumConfig(BaseModel): min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0) max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0) max_pump_time_s: float = Field(default=30.0, ge=0.0) gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0) pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0) @model_validator(mode='after') def validate_pressure(self) -> 'VacuumConfig': if self.min_pressure_bar >= self.max_pressure_bar: raise ValueError("min_pressure must be less than max_pressure") return self class GPIOConfig(BaseModel): debug: bool = False pump_pin: int = Field(ge=-1, le=27) valve_pin: int = Field(ge=-1, le=27) probe_pin: int = Field(ge=-1, le=27) measure_dir_pin: int = Field(ge=-1, le=27) measure_step_pin: int = Field(ge=-1, le=27) measure_en_pin: int = Field(default=0, ge=-1, le=27) mag_dist_pos_dir_pin: int = Field(ge=-1, le=27) mag_dist_pos_step_pin: int = Field(ge=-1, le=27) mag_dist_pos_en_pin: int = Field(default=0, ge=-1, le=27) mag_dist_pos_limit_pin: int = Field(ge=-1, le=27) mag_dist_rot_dir_pin: int = Field(ge=-1, le=27) mag_dist_rot_step_pin: int = Field(ge=-1, le=27) mag_dist_rot_en_pin: int = Field(default=0, ge=-1, le=27) mag_dist_rot_limit_pin: int = Field(ge=-1, le=27) mag_dist_sensor_pin: int = Field(ge=-1, le=27) class I2CConfig(BaseModel): debug: bool = False class MovementConfig(BaseModel): speed_mmmin: int = Field(default=400, ge=0) safe_height: float = Field(default=25.0, ge=0.0) class LoggingConfig(BaseModel): level: str = Field(default="INFO") file_path: Path = Field(default=Path("logs/robot.log")) max_file_size_mb: int = Field(default=1, ge=0) backup_count: int = Field(default=3, ge=0) console_output: bool = True @field_validator('level') @classmethod def validate_log_level(cls, v): valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] if v.upper() not in valid_levels: raise ValueError(f"Log level must be one of {valid_levels}") return v.upper() class RobotConfig(BaseModel): measurement_devices: List[DeviceConfig] feeder: FeederConfig defeeder: DefeederConfig feeder_magazines: List[MagazineConfig] = Field(default_factory=list) defeeder_magazines: List[DefeederMagazineConfig] = Field(default_factory=list) mag_distributor: MagDistributorConfig mqtt: MQTTConfig grbl: GRBLConfig vision: VisionConfig vacuum: VacuumConfig gpio: GPIOConfig i2c: I2CConfig movement: MovementConfig logging: LoggingConfig @model_validator(mode='after') def check_lists(self) -> 'RobotConfig': if not self.measurement_devices: raise ValueError("measurement_devices list must not be empty") if not self.feeder_magazines: raise ValueError("feeder_magazines list must not be empty") if not self.defeeder_magazines: raise ValueError("defeeder_magazines list must not be empty") return self class ConfigParser: def __init__(self, config_path: str = "robot_control/config/config.yaml"): self.config_path = Path(config_path) self._config = self._load_config() def _load_config(self) -> RobotConfig: if not self.config_path.exists(): raise FileNotFoundError(f"Config file not found: {self.config_path}") with open(self.config_path, 'r') as f: config_dict = yaml.safe_load(f) # Set device_id for each slot before creating the config for device in config_dict['measurement_devices']: device_id = device['id'] for slot in device['slots']: slot['device_id'] = device_id # Set default values for feeder and defeeder magazines config_dict['feeder_magazines'] = config_dict.get('feeder_magazines', []) config_dict['defeeder_magazines'] = config_dict.get('defeeder_magazines', []) return RobotConfig(**config_dict) @property def config(self) -> RobotConfig: return self._config