import asyncio import logging from robot_control.src.robot.controller import RobotController from robot_control.src.utils.config import ConfigParser, DefeederMagazineConfig from robot_control.src.vision.datamatrix import DataMatrixReader from robot_control.src.api.i2c_handler import I2C, MockI2C from robot_control.src.vendor.mcp3428 import MCP3428 from robot_control.src.robot.pump_controller import PumpController from robot_control.src.api.gpio import PiGPIO, MockGPIO from robot_control.src.utils.logging import setup_logging from robot_control.src.robot.mag_distributor import MagDistributor class LoaderSystem: def __init__(self): # Load configuration self.config = ConfigParser().config setup_logging(self.config) gpio_config = self.config.gpio # Initialize GPIO (mock or real based on debug flag) if gpio_config.debug: self.gpio = MockGPIO() else: self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin]) self.logger = logging.getLogger(__name__) # Initialize camera system self.vision = DataMatrixReader(self.config.vision) self.logger.info("Initializing LoaderSystem") self.vision.initialize() # Initialize I2C (mock or real based on debug flag) i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C self.i2c = I2C(i2c_device_class) self.i2c.initialize() self.logger.info(f"I2C initialized with {i2c_device_class.__name__}") # Initialize pump controller self.pump_controller = PumpController(self.config, self.gpio) # Initialize magazine distributor self.mag_distributor = MagDistributor(self.config, self.gpio) # Create feeder and defeeder queues used for async handling of cell # Magazine -> MagDist -> Feeder -> Robot # Magazine <- MagDist <- Defeeder <- Robot self.feeder_queue: asyncio.Queue[int] = asyncio.Queue(self.config.feeder.max_num_cells) self.defeeder_queue: asyncio.Queue[DefeederMagazineConfig] = asyncio.Queue(self.config.defeeder.max_num_cells) # Pass all hardware interfaces to the controller self.controller = RobotController( self.config, self.gpio, self.i2c, self.vision, self.pump_controller, self.feeder_queue, self.defeeder_queue ) async def run(self): """ Main entry point for running the loader system. Starts all main async loops and ensures cleanup on exit. """ await self.controller.connect() try: await asyncio.gather( self._loader_loop(), # Main loader orchestration loop self._poll_i2c_channels(), # Poll I2C sensors self._queue_monitor_loop(), # Monitor feeder/defeeder queues ) finally: self.cleanup() self.logger.info("Cleaning up resources...") async def _poll_i2c_channels(self): """ Periodically poll I2C channels for sensor readings. Handles pressure and end-effector state updates. """ while True: try: readings = await self.i2c.read_channels([3, 4]) # channel 1 is handled on demand in Controller for channel, value in readings.items(): self.logger.debug(f"Channel {channel} reading: {value}") if channel == 3: # Pressure reading self.pump_controller.handle_tank_reading(value) if channel == 4: # End-effector state state = self.pump_controller.check_endeffector_state(value) self.controller.set_suction_state(state) except Exception as e: self.logger.error(f"Error polling I2C channels: {str(e)}") await asyncio.sleep(1) # Poll every second async def _loader_loop(self): """ Main loop for the loader system. Orchestrates cell preparation, slot filling, and measurement processing. """ while True: await asyncio.sleep(0.1) # Avoid busy loop while True: # Prepare a cell in the feeder (returns True if a cell is ready) if not await self.controller.prepare_feeder_cell(): break # Fill the next free slot (returns True if a cell was placed) if not await self.controller.fill_next_free_slot(): break # Check for completed measurements and sort cells await self.controller.process_finished_measurements() async def _queue_monitor_loop(self): """ Periodically checks feeder and defeeder queues and calls placeholder functions. Handles refilling the feeder and processing the defeeder. """ while True: # Feeder: If queue below max, trigger refill placeholder if not self.feeder_queue.full(): self.logger.info(f"Refilling feeder...") await asyncio.get_event_loop().run_in_executor( None, self.mag_distributor.mag_to_feeder ) await self.feeder_queue.put(1) # Add to queue # Defeeder: If queue not empty, process next cell if not self.defeeder_queue.empty(): magazine = await self.defeeder_queue.get() # Remove from queue self.logger.info(f"Processing defeeder to magazine {magazine.name}...") await asyncio.get_event_loop().run_in_executor( None, self.mag_distributor.defeeder_to_mag, magazine ) await asyncio.sleep(2) # Adjust interval as needed def cleanup(self): """ Cleanup hardware resources (GPIO, etc.). """ self.gpio.cleanup() # Ensure PumpController cleans up gpio if __name__ == "__main__": loader_system = LoaderSystem() asyncio.run(loader_system.run())