|
|
@@ -1,151 +1,143 @@
|
|
|
-from dataclasses import dataclass
|
|
|
-from typing import List, Dict, Tuple
|
|
|
-import yaml
|
|
|
from pathlib import Path
|
|
|
+from typing import List, Dict, Tuple, Optional
|
|
|
+from pydantic import BaseModel, Field, model_validator, field_validator
|
|
|
+import yaml
|
|
|
|
|
|
-@dataclass
|
|
|
-class SlotConfig:
|
|
|
+class SlotConfig(BaseModel):
|
|
|
position: Tuple[float, float, float]
|
|
|
- occupied: bool
|
|
|
+ occupied: bool = False
|
|
|
slot_id: int
|
|
|
- device_id: str
|
|
|
- cell_id: str = None
|
|
|
+ device_id: Optional[str] = None
|
|
|
+ cell_id: Optional[str] = None
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
return f"Slot {self.slot_id}, Device: {self.device_id}"
|
|
|
|
|
|
-@dataclass
|
|
|
-class DeviceConfig:
|
|
|
+class DeviceConfig(BaseModel):
|
|
|
id: str
|
|
|
position: Tuple[float, float, float]
|
|
|
slots: List[SlotConfig]
|
|
|
|
|
|
-@dataclass
|
|
|
-class FeederConfig:
|
|
|
+ @field_validator('slots')
|
|
|
+ @classmethod
|
|
|
+ def validate_slots(cls, v):
|
|
|
+ slot_ids = [slot.slot_id for slot in v]
|
|
|
+ if len(slot_ids) != len(set(slot_ids)):
|
|
|
+ raise ValueError("Duplicate slot IDs found")
|
|
|
+ return v
|
|
|
+
|
|
|
+class FeederConfig(BaseModel):
|
|
|
position: Tuple[float, float, float]
|
|
|
approach_position: Tuple[float, float, float]
|
|
|
|
|
|
-@dataclass
|
|
|
-class DropoffGradeConfig:
|
|
|
- name: str
|
|
|
+class DropoffGradeConfig(BaseModel):
|
|
|
+ id: str
|
|
|
position: Tuple[float, float, float]
|
|
|
- capacity_threshold: float
|
|
|
+ capacity_threshold: float = Field(ge=0.0, le=1.0)
|
|
|
+
|
|
|
+class MQTTConfig(BaseModel):
|
|
|
+ broker: str = "localhost"
|
|
|
+ port: int = Field(default=1883, ge=1, le=65535)
|
|
|
+ username: Optional[str] = None
|
|
|
+ password: Optional[str] = None
|
|
|
+ keepalive: int = Field(default=60, ge=0)
|
|
|
+
|
|
|
+class GRBLConfig(BaseModel):
|
|
|
+ port: str = "debug"
|
|
|
+ baudrate: int = Field(default=115200, ge=9600)
|
|
|
+
|
|
|
+class VisionConfig(BaseModel):
|
|
|
+ camera_id: int = 0
|
|
|
+ resolution: Tuple[int, int] = (640, 480)
|
|
|
+ frame_rate: int = Field(default=30, ge=1)
|
|
|
+ exposure: float = Field(default=0.1, ge=0.0)
|
|
|
+ gain: float = Field(default=1.0, ge=0.0)
|
|
|
+
|
|
|
+class VacuumConfig(BaseModel):
|
|
|
+ min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
|
|
|
+ max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0)
|
|
|
+ max_pump_time_s: float = Field(default=30.0, ge=0.0)
|
|
|
+ gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0)
|
|
|
+ pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0)
|
|
|
+
|
|
|
+ @model_validator(mode='after')
|
|
|
+ def validate_pressure(self) -> 'VacuumConfig':
|
|
|
+ if self.min_pressure_bar >= self.max_pressure_bar:
|
|
|
+ raise ValueError("min_pressure must be less than max_pressure")
|
|
|
+ return self
|
|
|
|
|
|
-@dataclass
|
|
|
-class SystemConfig:
|
|
|
- speed: float
|
|
|
- acceleration: float
|
|
|
- safe_height: float
|
|
|
+class GPIOConfig(BaseModel):
|
|
|
+ debug: bool = False
|
|
|
+ pump_pin: int = Field(default=17, ge=0)
|
|
|
+ valve_pin: int = Field(default=27, ge=0)
|
|
|
+
|
|
|
+class MovementConfig(BaseModel):
|
|
|
+ speed: float = Field(default=400.0, ge=0.0)
|
|
|
+ acceleration: float = Field(default=20.0, ge=0.0)
|
|
|
+ safe_height: float = Field(default=25.0, ge=0.0)
|
|
|
+
|
|
|
+class LoggingConfig(BaseModel):
|
|
|
+ level: str = Field(default="INFO")
|
|
|
+ file_path: Path = Field(default=Path("logs/robot.log"))
|
|
|
+ max_file_size_mb: int = Field(default=1, ge=0)
|
|
|
+ backup_count: int = Field(default=3, ge=0)
|
|
|
+ console_output: bool = True
|
|
|
+
|
|
|
+ @field_validator('level')
|
|
|
+ @classmethod
|
|
|
+ def validate_log_level(cls, v):
|
|
|
+ valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
|
|
|
+ if v.upper() not in valid_levels:
|
|
|
+ raise ValueError(f"Log level must be one of {valid_levels}")
|
|
|
+ return v.upper()
|
|
|
+
|
|
|
+class RobotConfig(BaseModel):
|
|
|
+ measurement_devices: List[DeviceConfig]
|
|
|
+ feeder: FeederConfig
|
|
|
+ dropoff_grades: List[DropoffGradeConfig]
|
|
|
+ mqtt: MQTTConfig
|
|
|
+ grbl: GRBLConfig
|
|
|
+ vision: VisionConfig
|
|
|
+ vacuum: VacuumConfig
|
|
|
+ gpio: GPIOConfig
|
|
|
+ movement: MovementConfig
|
|
|
+ logging: LoggingConfig
|
|
|
+
|
|
|
+ @model_validator(mode='after')
|
|
|
+ def validate_dropoff_grades(self) -> 'RobotConfig':
|
|
|
+ # Sort grades by threshold in descending order for consistent behavior
|
|
|
+ sorted_grades = sorted(self.dropoff_grades, key=lambda x: x.capacity_threshold, reverse=True)
|
|
|
+ for i in range(1, len(sorted_grades)):
|
|
|
+ if sorted_grades[i-1].capacity_threshold <= sorted_grades[i].capacity_threshold:
|
|
|
+ raise ValueError(
|
|
|
+ f"Dropoff grade thresholds must be in strictly descending order. Found: "
|
|
|
+ f"Grade {sorted_grades[i-1].id} ({sorted_grades[i-1].capacity_threshold}) <= "
|
|
|
+ f"Grade {sorted_grades[i].id} ({sorted_grades[i].capacity_threshold})"
|
|
|
+ )
|
|
|
+ # Store sorted grades to ensure consistent behavior
|
|
|
+ self.dropoff_grades = sorted_grades
|
|
|
+ return self
|
|
|
|
|
|
class ConfigParser:
|
|
|
def __init__(self, config_path: str = "robot_control/config/config.yaml"):
|
|
|
self.config_path = Path(config_path)
|
|
|
- self.config = self._load_config()
|
|
|
+ self._config = self._load_config()
|
|
|
|
|
|
- def _load_config(self):
|
|
|
+ def _load_config(self) -> RobotConfig:
|
|
|
+ if not self.config_path.exists():
|
|
|
+ raise FileNotFoundError(f"Config file not found: {self.config_path}")
|
|
|
+
|
|
|
with open(self.config_path, 'r') as f:
|
|
|
- return yaml.safe_load(f)
|
|
|
-
|
|
|
- def get_devices(self) -> List[DeviceConfig]:
|
|
|
- devices = []
|
|
|
- for device in self.config['measurement_devices']:
|
|
|
- slots = [
|
|
|
- SlotConfig(
|
|
|
- position=tuple(slot['position']),
|
|
|
- occupied=slot['occupied'],
|
|
|
- slot_id=slot['slot_id'],
|
|
|
- device_id=device['id'],
|
|
|
- cell_id=slot['cell_id'] if slot['occupied'] else None,
|
|
|
- )
|
|
|
- for slot in device['slots']
|
|
|
- ]
|
|
|
- devices.append(DeviceConfig(
|
|
|
- id=device['id'],
|
|
|
- position=tuple(device['position']),
|
|
|
- slots=slots
|
|
|
- ))
|
|
|
- return devices
|
|
|
-
|
|
|
- def get_feeder(self) -> FeederConfig:
|
|
|
- feeder = self.config['feeder']
|
|
|
- return FeederConfig(
|
|
|
- position=tuple(feeder['position']),
|
|
|
- approach_position=tuple(feeder['approach_position'])
|
|
|
- )
|
|
|
-
|
|
|
- def get_dropoff_grades(self) -> Dict[str, DropoffGradeConfig]:
|
|
|
- grades = {}
|
|
|
- for name, grade in self.config['dropoff_grades'].items():
|
|
|
- grades[name] = DropoffGradeConfig(
|
|
|
- name = name,
|
|
|
- position=tuple(grade['position']),
|
|
|
- capacity_threshold=grade['capacity_threshold']
|
|
|
- )
|
|
|
- return grades
|
|
|
-
|
|
|
- def get_system_settings(self) -> SystemConfig:
|
|
|
- settings = self.config['system_settings']
|
|
|
- return SystemConfig(
|
|
|
- speed=settings['speed'],
|
|
|
- acceleration=settings['acceleration'],
|
|
|
- safe_height=settings['safe_height']
|
|
|
- )
|
|
|
-
|
|
|
- def get_mqtt_config(self) -> dict:
|
|
|
- """Get MQTT broker configuration"""
|
|
|
- mqtt_config = self.config.get('mqtt', {})
|
|
|
- return {
|
|
|
- 'broker': mqtt_config.get('broker', 'localhost'),
|
|
|
- 'port': mqtt_config.get('port', 1883),
|
|
|
- 'username': mqtt_config.get('username', None),
|
|
|
- 'password': mqtt_config.get('password', None),
|
|
|
- 'keepalive': mqtt_config.get('keepalive', 60)
|
|
|
- }
|
|
|
-
|
|
|
- def get_grbl_config(self) -> dict:
|
|
|
- """Get GRBL handler configuration"""
|
|
|
- grbl_config = self.config.get('grbl', {})
|
|
|
- return {
|
|
|
- 'port': grbl_config.get('port', None),
|
|
|
- 'baudrate': grbl_config.get('baudrate', 115200),
|
|
|
- }
|
|
|
-
|
|
|
- def get_vision_config(self) -> dict:
|
|
|
- """Get vision system configuration"""
|
|
|
- vision_config = self.config.get('vision', {})
|
|
|
- return {
|
|
|
- 'camera_id': vision_config.get('camera_id', 0),
|
|
|
- 'resolution': tuple(vision_config.get('resolution', (640, 480))),
|
|
|
- 'frame_rate': vision_config.get('frame_rate', 30),
|
|
|
- 'exposure': vision_config.get('exposure', 0.1),
|
|
|
- 'gain': vision_config.get('gain', 1.0)
|
|
|
- }
|
|
|
-
|
|
|
- def get_vacuum_config(self) -> dict:
|
|
|
- """Get vacuum system configuration"""
|
|
|
- vacuum_config = self.config.get('vacuum', {})
|
|
|
- return {
|
|
|
- 'min_pressure_bar': vacuum_config.get('min_pressure_bar', 0),
|
|
|
- 'max_pressure_bar': vacuum_config.get('max_pressure_bar', 0),
|
|
|
- 'max_pump_time_s': vacuum_config.get('max_pump_time_s', 30),
|
|
|
- 'gripping_threshold_v': vacuum_config.get('gripping_threshold_v', 0.8)
|
|
|
- }
|
|
|
-
|
|
|
- def get_gpio_config(self) -> dict:
|
|
|
- """Get gpio system configuration"""
|
|
|
- gpio_config = self.config.get('gpio', {})
|
|
|
- return {
|
|
|
- 'pump_pin': gpio_config.get('pump_pin', 17),
|
|
|
- 'valve_pin': gpio_config.get('valve_pin', 27)
|
|
|
- }
|
|
|
-
|
|
|
- def get_logging_config(self) -> dict:
|
|
|
- """Get logging configuration"""
|
|
|
- log_config = self.config.get('logging', {})
|
|
|
- return {
|
|
|
- 'level': log_config.get('level', 'INFO'),
|
|
|
- 'file_path': log_config.get('file_path', 'logs/robot.log'),
|
|
|
- 'max_file_size_mb': log_config.get('max_file_size_mb', 10),
|
|
|
- 'backup_count': log_config.get('backup_count', 5),
|
|
|
- 'console_output': log_config.get('console_output', True)
|
|
|
- }
|
|
|
+ config_dict = yaml.safe_load(f)
|
|
|
+
|
|
|
+ # Set device_id for each slot before creating the config
|
|
|
+ for device in config_dict['measurement_devices']:
|
|
|
+ device_id = device['id']
|
|
|
+ for slot in device['slots']:
|
|
|
+ slot['device_id'] = device_id
|
|
|
+
|
|
|
+ return RobotConfig(**config_dict)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def config(self) -> RobotConfig:
|
|
|
+ return self._config
|