config.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from pathlib import Path
  2. from typing import List, Dict, Tuple, Optional
  3. from pydantic import BaseModel, Field, model_validator, field_validator
  4. import yaml
  5. # This module defines the configuration schema for the robot control system.
  6. # It uses Pydantic models to validate and structure all configuration options,
  7. # including hardware pin assignments, device positions, magazine layouts, and
  8. # operational parameters. The configuration is loaded from a YAML file and
  9. # provides a single source of truth for all system components.
  10. class SlotConfig(BaseModel):
  11. position: Tuple[float, float, float]
  12. occupied: bool = False
  13. slot_id: int
  14. device_id: Optional[str] = None
  15. cell_id: Optional[int] = None
  16. def __str__(self) -> str:
  17. return f"Slot {self.slot_id}, Device: {self.device_id}"
  18. class DeviceConfig(BaseModel):
  19. id: str
  20. position: Tuple[float, float, float]
  21. slots: List[SlotConfig]
  22. @field_validator('slots')
  23. @classmethod
  24. def validate_slots(cls, v):
  25. slot_ids = [slot.slot_id for slot in v]
  26. if len(slot_ids) != len(set(slot_ids)):
  27. raise ValueError("Duplicate slot IDs found")
  28. return v
  29. class MagDistPosition(BaseModel):
  30. pos_mm: float = Field(default=0.0, ge=0, le=700)
  31. rot_deg: float = Field(default=0.0, ge=-180.0, le=180.0)
  32. class MagazineConfig(BaseModel):
  33. mag_pos: MagDistPosition
  34. max_num_cells: int = Field(default=10, ge=0)
  35. class DefeederMagazineConfig(MagazineConfig):
  36. name: str
  37. health_range: Tuple[int, int] = Field(default=(0, 100))
  38. @field_validator('health_range')
  39. @classmethod
  40. def validate_health_range(cls, v):
  41. min_val = 0
  42. max_val = 100
  43. if not (min_val <= v[0] < v[1] <= max_val):
  44. raise ValueError(f"health_range must be a tuple (min, max) with {min_val} <= min < max <= {max_val}")
  45. return v
  46. class FeederConfig(BaseModel):
  47. robot_pos: Tuple[float, float, float]
  48. mag_pos: MagDistPosition
  49. min_voltage: float = Field(default=2.0, ge=0.0)
  50. max_num_cells: int = Field(default=10, ge=0)
  51. class DefeederConfig(BaseModel):
  52. robot_pos: Tuple[float, float, float]
  53. mag_pos: MagDistPosition
  54. max_num_cells: int = Field(default=10, ge=0)
  55. class MagDistributorConfig(BaseModel):
  56. debug: Optional[bool] = False
  57. max_speed_mmmin: int = Field(default=100, ge=0)
  58. home_speed_mmmin: int = Field(default=50, ge=0)
  59. length_mm: float = Field(default=800.0, ge=0.0)
  60. class MQTTConfig(BaseModel):
  61. broker: str = "localhost"
  62. port: int = Field(default=1883, ge=1, le=65535)
  63. username: Optional[str] = None
  64. password: Optional[str] = None
  65. keepalive: int = Field(default=60, ge=0)
  66. class GRBLConfig(BaseModel):
  67. port: str = "debug"
  68. baudrate: int = Field(default=115200, ge=9600)
  69. class VisionConfig(BaseModel):
  70. camera_id: int = 0
  71. resolution: Tuple[int, int] = (640, 480)
  72. frame_rate: int = Field(default=30, ge=1)
  73. exposure: float = Field(default=0.1, ge=0.0)
  74. gain: float = Field(default=1.0, ge=0.0)
  75. bbox: Tuple[int, int, int, int] = Field(default=(0, 0, 640, 480)) # (x, y, width, height)
  76. @model_validator(mode='after')
  77. def validate_bbox(self) -> 'VisionConfig':
  78. if self.bbox[0] + self.bbox[2] > self.resolution[0]:
  79. raise ValueError("Bounding box exceeds image width")
  80. if self.bbox[1] + self.bbox[3] > self.resolution[1]:
  81. raise ValueError("Bounding box exceeds image height")
  82. return self
  83. class VacuumConfig(BaseModel):
  84. min_pressure_bar: float = Field(default=0.5, ge=-1.0, le=1.0)
  85. max_pressure_bar: float = Field(default=1.0, ge=-1.0, le=1.0)
  86. max_pump_time_s: float = Field(default=30.0, ge=0.0)
  87. gripping_threshold_bar: float = Field(default=0.8, ge=-1.0, le=1.0)
  88. pump_watchdog_timeout_s: float = Field(default=30.0, ge=0.0)
  89. @model_validator(mode='after')
  90. def validate_pressure(self) -> 'VacuumConfig':
  91. if self.min_pressure_bar >= self.max_pressure_bar:
  92. raise ValueError("min_pressure must be less than max_pressure")
  93. return self
  94. class GPIOConfig(BaseModel):
  95. debug: bool = False
  96. pump_pin: int = Field(ge=-1, le=27)
  97. valve_pin: int = Field(ge=-1, le=27)
  98. probe_pin: int = Field(ge=-1, le=27)
  99. measure_dir_pin: int = Field(ge=-1, le=27)
  100. measure_step_pin: int = Field(ge=-1, le=27)
  101. measure_en_pin: int = Field(default=0, ge=-1, le=27)
  102. mag_dist_pos_dir_pin: int = Field(ge=-1, le=27)
  103. mag_dist_pos_step_pin: int = Field(ge=-1, le=27)
  104. mag_dist_pos_en_pin: int = Field(default=0, ge=-1, le=27)
  105. mag_dist_pos_limit_pin: int = Field(ge=-1, le=27)
  106. mag_dist_rot_dir_pin: int = Field(ge=-1, le=27)
  107. mag_dist_rot_step_pin: int = Field(ge=-1, le=27)
  108. mag_dist_rot_en_pin: int = Field(default=0, ge=-1, le=27)
  109. mag_dist_rot_limit_pin: int = Field(ge=-1, le=27)
  110. mag_dist_sensor_pin: int = Field(ge=-1, le=27)
  111. class I2CConfig(BaseModel):
  112. debug: bool = False
  113. class MovementConfig(BaseModel):
  114. speed_mmmin: int = Field(default=400, ge=0)
  115. safe_height: float = Field(default=25.0, ge=0.0)
  116. class LoggingConfig(BaseModel):
  117. level: str = Field(default="INFO")
  118. file_path: Path = Field(default=Path("logs/robot.log"))
  119. max_file_size_mb: int = Field(default=1, ge=0)
  120. backup_count: int = Field(default=3, ge=0)
  121. console_output: bool = True
  122. @field_validator('level')
  123. @classmethod
  124. def validate_log_level(cls, v):
  125. valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
  126. if v.upper() not in valid_levels:
  127. raise ValueError(f"Log level must be one of {valid_levels}")
  128. return v.upper()
  129. class RobotConfig(BaseModel):
  130. measurement_devices: List[DeviceConfig]
  131. feeder: FeederConfig
  132. defeeder: DefeederConfig
  133. feeder_magazines: List[MagazineConfig] = Field(default_factory=list)
  134. defeeder_magazines: List[DefeederMagazineConfig] = Field(default_factory=list)
  135. mag_distributor: MagDistributorConfig
  136. mqtt: MQTTConfig
  137. grbl: GRBLConfig
  138. vision: VisionConfig
  139. vacuum: VacuumConfig
  140. gpio: GPIOConfig
  141. i2c: I2CConfig
  142. movement: MovementConfig
  143. logging: LoggingConfig
  144. @model_validator(mode='after')
  145. def check_lists(self) -> 'RobotConfig':
  146. if not self.measurement_devices:
  147. raise ValueError("measurement_devices list must not be empty")
  148. if not self.feeder_magazines:
  149. raise ValueError("feeder_magazines list must not be empty")
  150. if not self.defeeder_magazines:
  151. raise ValueError("defeeder_magazines list must not be empty")
  152. return self
  153. class ConfigParser:
  154. def __init__(self, config_path: str = "robot_control/config/config.yaml"):
  155. self.config_path = Path(config_path)
  156. self._config = self._load_config()
  157. def _load_config(self) -> RobotConfig:
  158. if not self.config_path.exists():
  159. raise FileNotFoundError(f"Config file not found: {self.config_path}")
  160. with open(self.config_path, 'r') as f:
  161. config_dict = yaml.safe_load(f)
  162. # Set device_id for each slot before creating the config
  163. for device in config_dict['measurement_devices']:
  164. device_id = device['id']
  165. for slot in device['slots']:
  166. slot['device_id'] = device_id
  167. # Set default values for feeder and defeeder magazines
  168. config_dict['feeder_magazines'] = config_dict.get('feeder_magazines', [])
  169. config_dict['defeeder_magazines'] = config_dict.get('defeeder_magazines', [])
  170. return RobotConfig(**config_dict)
  171. @property
  172. def config(self) -> RobotConfig:
  173. return self._config