| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- from pathlib import Path
- from typing import List, Dict, Tuple, Optional
- from pydantic import BaseModel, Field, model_validator, field_validator
- import yaml
- # This module defines the configuration schema for the robot control system.
- # It uses Pydantic models to validate and structure all configuration options,
- # including hardware pin assignments, device positions, magazine layouts, and
- # operational parameters. The configuration is loaded from a YAML file and
- # provides a single source of truth for all system components.
- class SlotConfig(BaseModel):
- position: Tuple[float, float, float]
- occupied: bool = False
- slot_id: int
- device_id: Optional[str] = None
- cell_id: Optional[int] = None
- def __str__(self) -> str:
- return f"Slot {self.slot_id}, Device: {self.device_id}"
- class DeviceConfig(BaseModel):
- id: str
- position: Tuple[float, float, float]
- slots: List[SlotConfig]
- @field_validator('slots')
- @classmethod
- def validate_slots(cls, v):
- slot_ids = [slot.slot_id for slot in v]
- if len(slot_ids) != len(set(slot_ids)):
- raise ValueError("Duplicate slot IDs found")
- return v
- class MagDistPosition(BaseModel):
- pos_mm: float = Field(default=0.0, ge=0, le=700)
- rot_deg: float = Field(default=0.0, ge=-180.0, le=180.0)
- class MagazineConfig(BaseModel):
- mag_pos: MagDistPosition
- max_num_cells: int = Field(default=10, ge=0)
- class DefeederMagazineConfig(MagazineConfig):
- name: str
- health_range: Tuple[int, int] = Field(default=(0, 100))
- @field_validator('health_range')
- @classmethod
- def validate_health_range(cls, v):
- min_val = 0
- max_val = 100
- if not (min_val <= v[0] < v[1] <= max_val):
- raise ValueError(f"health_range must be a tuple (min, max) with {min_val} <= min < max <= {max_val}")
- return v
- class FeederConfig(BaseModel):
- robot_pos: Tuple[float, float, float]
- mag_pos: MagDistPosition
- min_voltage: float = Field(default=2.0, ge=0.0)
- max_num_cells: int = Field(default=10, ge=0)
- class DefeederConfig(BaseModel):
- robot_pos: Tuple[float, float, float]
- mag_pos: MagDistPosition
- max_num_cells: int = Field(default=10, ge=0)
- class MagDistributorConfig(BaseModel):
- debug: Optional[bool] = False
- max_speed_mmmin: int = Field(default=100, ge=0)
- home_speed_mmmin: int = Field(default=50, ge=0)
- length_mm: float = Field(default=800.0, ge=0.0)
- class MQTTConfig(BaseModel):
- broker: str = "localhost"
- port: int = Field(default=1883, ge=1, le=65535)
- username: Optional[str] = None
- password: Optional[str] = None
- keepalive: int = Field(default=60, ge=0)
- class GRBLConfig(BaseModel):
- port: str = "debug"
- baudrate: int = Field(default=115200, ge=9600)
- class VisionConfig(BaseModel):
- camera_id: int = 0
- resolution: Tuple[int, int] = (640, 480)
- frame_rate: int = Field(default=30, ge=1)
- exposure: float = Field(default=0.1, ge=0.0)
- gain: float = Field(default=1.0, ge=0.0)
- bbox: Tuple[int, int, int, int] = Field(default=(0, 0, 640, 480)) # (x, y, width, height)
- @model_validator(mode='after')
- def validate_bbox(self) -> 'VisionConfig':
- if self.bbox[0] + self.bbox[2] > self.resolution[0]:
- raise ValueError("Bounding box exceeds image width")
- if self.bbox[1] + self.bbox[3] > self.resolution[1]:
- raise ValueError("Bounding box exceeds image height")
- return self
- class VacuumConfig(BaseModel):
- min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
- max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0)
- max_pump_time_s: float = Field(default=30.0, ge=0.0)
- gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0)
- pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0)
- @model_validator(mode='after')
- def validate_pressure(self) -> 'VacuumConfig':
- if self.min_pressure_bar >= self.max_pressure_bar:
- raise ValueError("min_pressure must be less than max_pressure")
- return self
- class GPIOConfig(BaseModel):
- debug: bool = False
- pump_pin: int = Field(ge=-1, le=27)
- valve_pin: int = Field(ge=-1, le=27)
- probe_pin: int = Field(ge=-1, le=27)
- measure_dir_pin: int = Field(ge=-1, le=27)
- measure_step_pin: int = Field(ge=-1, le=27)
- measure_en_pin: int = Field(default=0, ge=-1, le=27)
- mag_dist_pos_dir_pin: int = Field(ge=-1, le=27)
- mag_dist_pos_step_pin: int = Field(ge=-1, le=27)
- mag_dist_pos_en_pin: int = Field(default=0, ge=-1, le=27)
- mag_dist_pos_limit_pin: int = Field(ge=-1, le=27)
- mag_dist_rot_dir_pin: int = Field(ge=-1, le=27)
- mag_dist_rot_step_pin: int = Field(ge=-1, le=27)
- mag_dist_rot_en_pin: int = Field(default=0, ge=-1, le=27)
- mag_dist_rot_limit_pin: int = Field(ge=-1, le=27)
- mag_dist_sensor_pin: int = Field(ge=-1, le=27)
- class I2CConfig(BaseModel):
- debug: bool = False
- class MovementConfig(BaseModel):
- speed_mmmin: int = Field(default=400, ge=0)
- safe_height: float = Field(default=25.0, ge=0.0)
- class LoggingConfig(BaseModel):
- level: str = Field(default="INFO")
- file_path: Path = Field(default=Path("logs/robot.log"))
- max_file_size_mb: int = Field(default=1, ge=0)
- backup_count: int = Field(default=3, ge=0)
- console_output: bool = True
- @field_validator('level')
- @classmethod
- def validate_log_level(cls, v):
- valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
- if v.upper() not in valid_levels:
- raise ValueError(f"Log level must be one of {valid_levels}")
- return v.upper()
- class RobotConfig(BaseModel):
- measurement_devices: List[DeviceConfig]
- feeder: FeederConfig
- defeeder: DefeederConfig
- feeder_magazines: List[MagazineConfig] = Field(default_factory=list)
- defeeder_magazines: List[DefeederMagazineConfig] = Field(default_factory=list)
- mag_distributor: MagDistributorConfig
- mqtt: MQTTConfig
- grbl: GRBLConfig
- vision: VisionConfig
- vacuum: VacuumConfig
- gpio: GPIOConfig
- i2c: I2CConfig
- movement: MovementConfig
- logging: LoggingConfig
- @model_validator(mode='after')
- def check_lists(self) -> 'RobotConfig':
- if not self.measurement_devices:
- raise ValueError("measurement_devices list must not be empty")
- if not self.feeder_magazines:
- raise ValueError("feeder_magazines list must not be empty")
- if not self.defeeder_magazines:
- raise ValueError("defeeder_magazines list must not be empty")
- return self
- class ConfigParser:
- def __init__(self, config_path: str = "robot_control/config/config.yaml"):
- self.config_path = Path(config_path)
- self._config = self._load_config()
- def _load_config(self) -> RobotConfig:
- if not self.config_path.exists():
- raise FileNotFoundError(f"Config file not found: {self.config_path}")
-
- with open(self.config_path, 'r') as f:
- config_dict = yaml.safe_load(f)
- # Set device_id for each slot before creating the config
- for device in config_dict['measurement_devices']:
- device_id = device['id']
- for slot in device['slots']:
- slot['device_id'] = device_id
- # Set default values for feeder and defeeder magazines
- config_dict['feeder_magazines'] = config_dict.get('feeder_magazines', [])
- config_dict['defeeder_magazines'] = config_dict.get('defeeder_magazines', [])
- return RobotConfig(**config_dict)
- @property
- def config(self) -> RobotConfig:
- return self._config
|