config.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. from pathlib import Path
  2. from typing import List, Dict, Tuple, Optional
  3. from pydantic import BaseModel, Field, model_validator, field_validator
  4. import yaml
  5. class SlotConfig(BaseModel):
  6. position: Tuple[float, float, float]
  7. occupied: bool = False
  8. slot_id: int
  9. device_id: Optional[str] = None
  10. cell_id: Optional[str] = None
  11. def __str__(self) -> str:
  12. return f"Slot {self.slot_id}, Device: {self.device_id}"
  13. class DeviceConfig(BaseModel):
  14. id: str
  15. position: Tuple[float, float, float]
  16. slots: List[SlotConfig]
  17. @field_validator('slots')
  18. @classmethod
  19. def validate_slots(cls, v):
  20. slot_ids = [slot.slot_id for slot in v]
  21. if len(slot_ids) != len(set(slot_ids)):
  22. raise ValueError("Duplicate slot IDs found")
  23. return v
  24. class FeederConfig(BaseModel):
  25. position: Tuple[float, float, float]
  26. approach_position: Tuple[float, float, float]
  27. class DropoffGradeConfig(BaseModel):
  28. id: str
  29. position: Tuple[float, float, float]
  30. capacity_threshold: float = Field(ge=0.0, le=1.0)
  31. class MQTTConfig(BaseModel):
  32. broker: str = "localhost"
  33. port: int = Field(default=1883, ge=1, le=65535)
  34. username: Optional[str] = None
  35. password: Optional[str] = None
  36. keepalive: int = Field(default=60, ge=0)
  37. class GRBLConfig(BaseModel):
  38. port: str = "debug"
  39. baudrate: int = Field(default=115200, ge=9600)
  40. class VisionConfig(BaseModel):
  41. camera_id: int = 0
  42. resolution: Tuple[int, int] = (640, 480)
  43. frame_rate: int = Field(default=30, ge=1)
  44. exposure: float = Field(default=0.1, ge=0.0)
  45. gain: float = Field(default=1.0, ge=0.0)
  46. class VacuumConfig(BaseModel):
  47. min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
  48. max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0)
  49. max_pump_time_s: float = Field(default=30.0, ge=0.0)
  50. gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0)
  51. pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0)
  52. @model_validator(mode='after')
  53. def validate_pressure(self) -> 'VacuumConfig':
  54. if self.min_pressure_bar >= self.max_pressure_bar:
  55. raise ValueError("min_pressure must be less than max_pressure")
  56. return self
  57. class GPIOConfig(BaseModel):
  58. debug: bool = False
  59. pump_pin: int = Field(default=17, ge=0)
  60. valve_pin: int = Field(default=27, ge=0)
  61. class MovementConfig(BaseModel):
  62. speed: float = Field(default=400.0, ge=0.0)
  63. acceleration: float = Field(default=20.0, ge=0.0)
  64. safe_height: float = Field(default=25.0, ge=0.0)
  65. class LoggingConfig(BaseModel):
  66. level: str = Field(default="INFO")
  67. file_path: Path = Field(default=Path("logs/robot.log"))
  68. max_file_size_mb: int = Field(default=1, ge=0)
  69. backup_count: int = Field(default=3, ge=0)
  70. console_output: bool = True
  71. @field_validator('level')
  72. @classmethod
  73. def validate_log_level(cls, v):
  74. valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
  75. if v.upper() not in valid_levels:
  76. raise ValueError(f"Log level must be one of {valid_levels}")
  77. return v.upper()
  78. class RobotConfig(BaseModel):
  79. measurement_devices: List[DeviceConfig]
  80. feeder: FeederConfig
  81. dropoff_grades: List[DropoffGradeConfig]
  82. mqtt: MQTTConfig
  83. grbl: GRBLConfig
  84. vision: VisionConfig
  85. vacuum: VacuumConfig
  86. gpio: GPIOConfig
  87. movement: MovementConfig
  88. logging: LoggingConfig
  89. @model_validator(mode='after')
  90. def validate_dropoff_grades(self) -> 'RobotConfig':
  91. # Sort grades by threshold in descending order for consistent behavior
  92. sorted_grades = sorted(self.dropoff_grades, key=lambda x: x.capacity_threshold, reverse=True)
  93. for i in range(1, len(sorted_grades)):
  94. if sorted_grades[i-1].capacity_threshold <= sorted_grades[i].capacity_threshold:
  95. raise ValueError(
  96. f"Dropoff grade thresholds must be in strictly descending order. Found: "
  97. f"Grade {sorted_grades[i-1].id} ({sorted_grades[i-1].capacity_threshold}) <= "
  98. f"Grade {sorted_grades[i].id} ({sorted_grades[i].capacity_threshold})"
  99. )
  100. # Store sorted grades to ensure consistent behavior
  101. self.dropoff_grades = sorted_grades
  102. return self
  103. class ConfigParser:
  104. def __init__(self, config_path: str = "robot_control/config/config.yaml"):
  105. self.config_path = Path(config_path)
  106. self._config = self._load_config()
  107. def _load_config(self) -> RobotConfig:
  108. if not self.config_path.exists():
  109. raise FileNotFoundError(f"Config file not found: {self.config_path}")
  110. with open(self.config_path, 'r') as f:
  111. config_dict = yaml.safe_load(f)
  112. # Set device_id for each slot before creating the config
  113. for device in config_dict['measurement_devices']:
  114. device_id = device['id']
  115. for slot in device['slots']:
  116. slot['device_id'] = device_id
  117. return RobotConfig(**config_dict)
  118. @property
  119. def config(self) -> RobotConfig:
  120. return self._config