import asyncio import logging from robot_control.src.robot.controller import RobotController, Cell, CellStatus from robot_control.src.utils.config import ConfigParser from robot_control.src.vision.datamatrix import DataMatrixReader from robot_control.src.api.i2c_handler import I2C, MockI2C from robot_control.src.vendor.mcp3428 import MCP3428 from robot_control.src.robot.pump_controller import PumpController from robot_control.src.api.gpio import PiGPIO, MockGPIO logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(module)s - %(levelname)s - %(message)s', ) logger = logging.getLogger(__name__) """ This is a test script for the loader system. It initializes the robot controller, vision system, and I2C handler, """ class LoaderSystem: def __init__(self): self.config = ConfigParser().config gpio_config = self.config.gpio if gpio_config.debug: self.gpio = MockGPIO() else: self.gpio = PiGPIO([gpio_config.pump_pin, gpio_config.valve_pin], []) self.controller = RobotController(self.config, self.gpio) # self.vision = DataMatrixReader(self.config) logger.info("Initializing LoaderSystem") # self.vision.initialize() # Test stuff self.test_dropoff = self.config.dropoff_grades[0] self.test_slot = self.config.measurement_devices[0].slots[3] # Use mock I2C device if debug is enabled i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C self.i2c = I2C(i2c_device_class) self.i2c.initialize() logger.info(f"I2C initialized with {i2c_device_class.__name__}") self.pump_controller = PumpController(self.config, self.gpio) async def run(self): await self.controller.connect() await asyncio.gather( self._loader_loop(), self._poll_i2c_channels() ) async def _poll_i2c_channels(self): while True: try: readings = await self.i2c.read_channels([1, 3, 4]) for channel, value in readings.items(): logger.debug(f"Channel {channel} reading: {value}") # if channel == 3: # Pressure reading # self.pump_controller.handle_tank_reading(value) if channel == 4: state = self.pump_controller.check_endeffector_state(value) self.controller.set_suction_state(state) except Exception as e: logger.error(f"Error polling I2C channels: {str(e)}") await asyncio.sleep(1) # Poll every second async def _loader_loop(self): await self.controller.pick_cell_from_slot(self.test_slot) await self.controller.dropoff_cell(self.test_dropoff) await self.controller.pick_cell_from_feeder() cell = Cell(id="test_cell", status=CellStatus.WAITING) await self.controller.insert_cell_to_slot(cell, self.test_slot) def __del__(self): self.pump_controller.cleanup() # Ensure PumpController cleans up gpio if __name__ == "__main__": loader_system = LoaderSystem() asyncio.run(loader_system.run())