| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- from pathlib import Path
- from typing import List, Dict, Tuple, Optional
- from pydantic import BaseModel, Field, model_validator, field_validator
- import yaml
- class SlotConfig(BaseModel):
- position: Tuple[float, float, float]
- occupied: bool = False
- slot_id: int
- device_id: Optional[str] = None
- cell_id: Optional[str] = None
- def __str__(self) -> str:
- return f"Slot {self.slot_id}, Device: {self.device_id}"
- class DeviceConfig(BaseModel):
- id: str
- position: Tuple[float, float, float]
- slots: List[SlotConfig]
- @field_validator('slots')
- @classmethod
- def validate_slots(cls, v):
- slot_ids = [slot.slot_id for slot in v]
- if len(slot_ids) != len(set(slot_ids)):
- raise ValueError("Duplicate slot IDs found")
- return v
- class FeederConfig(BaseModel):
- position: Tuple[float, float, float]
- approach_position: Tuple[float, float, float]
- class DropoffGradeConfig(BaseModel):
- id: str
- position: Tuple[float, float, float]
- capacity_threshold: float = Field(ge=0.0, le=1.0)
- class MQTTConfig(BaseModel):
- broker: str = "localhost"
- port: int = Field(default=1883, ge=1, le=65535)
- username: Optional[str] = None
- password: Optional[str] = None
- keepalive: int = Field(default=60, ge=0)
- class GRBLConfig(BaseModel):
- port: str = "debug"
- baudrate: int = Field(default=115200, ge=9600)
- class VisionConfig(BaseModel):
- camera_id: int = 0
- resolution: Tuple[int, int] = (640, 480)
- frame_rate: int = Field(default=30, ge=1)
- exposure: float = Field(default=0.1, ge=0.0)
- gain: float = Field(default=1.0, ge=0.0)
- class VacuumConfig(BaseModel):
- min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
- max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0)
- max_pump_time_s: float = Field(default=30.0, ge=0.0)
- gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0)
- pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0)
- @model_validator(mode='after')
- def validate_pressure(self) -> 'VacuumConfig':
- if self.min_pressure_bar >= self.max_pressure_bar:
- raise ValueError("min_pressure must be less than max_pressure")
- return self
- class GPIOConfig(BaseModel):
- debug: bool = False
- pump_pin: int = Field(default=17, ge=0)
- valve_pin: int = Field(default=27, ge=0)
- class MovementConfig(BaseModel):
- speed: float = Field(default=400.0, ge=0.0)
- acceleration: float = Field(default=20.0, ge=0.0)
- safe_height: float = Field(default=25.0, ge=0.0)
- class LoggingConfig(BaseModel):
- level: str = Field(default="INFO")
- file_path: Path = Field(default=Path("logs/robot.log"))
- max_file_size_mb: int = Field(default=1, ge=0)
- backup_count: int = Field(default=3, ge=0)
- console_output: bool = True
- @field_validator('level')
- @classmethod
- def validate_log_level(cls, v):
- valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
- if v.upper() not in valid_levels:
- raise ValueError(f"Log level must be one of {valid_levels}")
- return v.upper()
- class RobotConfig(BaseModel):
- measurement_devices: List[DeviceConfig]
- feeder: FeederConfig
- dropoff_grades: List[DropoffGradeConfig]
- mqtt: MQTTConfig
- grbl: GRBLConfig
- vision: VisionConfig
- vacuum: VacuumConfig
- gpio: GPIOConfig
- movement: MovementConfig
- logging: LoggingConfig
- @model_validator(mode='after')
- def validate_dropoff_grades(self) -> 'RobotConfig':
- # Sort grades by threshold in descending order for consistent behavior
- sorted_grades = sorted(self.dropoff_grades, key=lambda x: x.capacity_threshold, reverse=True)
- for i in range(1, len(sorted_grades)):
- if sorted_grades[i-1].capacity_threshold <= sorted_grades[i].capacity_threshold:
- raise ValueError(
- f"Dropoff grade thresholds must be in strictly descending order. Found: "
- f"Grade {sorted_grades[i-1].id} ({sorted_grades[i-1].capacity_threshold}) <= "
- f"Grade {sorted_grades[i].id} ({sorted_grades[i].capacity_threshold})"
- )
- # Store sorted grades to ensure consistent behavior
- self.dropoff_grades = sorted_grades
- return self
- class ConfigParser:
- def __init__(self, config_path: str = "robot_control/config/config.yaml"):
- self.config_path = Path(config_path)
- self._config = self._load_config()
- def _load_config(self) -> RobotConfig:
- if not self.config_path.exists():
- raise FileNotFoundError(f"Config file not found: {self.config_path}")
-
- with open(self.config_path, 'r') as f:
- config_dict = yaml.safe_load(f)
- # Set device_id for each slot before creating the config
- for device in config_dict['measurement_devices']:
- device_id = device['id']
- for slot in device['slots']:
- slot['device_id'] = device_id
-
- return RobotConfig(**config_dict)
- @property
- def config(self) -> RobotConfig:
- return self._config
|