import signal import sys import asyncio import logging from robot_control.src.robot.controller import RobotController, Cell, CellStatus from robot_control.src.utils.config import ConfigParser, DefeederMagazineConfig from robot_control.src.utils.logging import setup_logging from robot_control.src.vision.datamatrix import DataMatrixReader from robot_control.src.api.i2c_handler import I2C, MockI2C from robot_control.src.vendor.mcp3428 import MCP3428 from robot_control.src.robot.pump_controller import PumpController from robot_control.src.api.gpio import PiGPIO, MockGPIO from robot_control.src.robot.mag_distributor import MagDistributor """ This is a test script for the loader system. It initializes the robot controller, vision system, and I2C handler, """ # Configure root logger logging.basicConfig( level=logging.INFO, # show INFO and above (use DEBUG for more details) format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) # Make sure all existing loggers propagate to root for name, logger_obj in logging.Logger.manager.loggerDict.items(): if isinstance(logger_obj, logging.Logger): logger_obj.setLevel(logging.INFO) logger_obj.propagate = True async def wait_for_enter(): # Use asyncio.Event to coordinate between input and async code event = asyncio.Event() def input_callback(): logging.info("Press Enter to continue...") input() # Wait for Enter event.set() # Run input in a separate thread to not block the event loop loop = asyncio.get_running_loop() await loop.run_in_executor(None, input_callback) await event.wait() class LoaderSystem: def __init__(self): self.config = ConfigParser().config setup_logging(self.config) # Set up logging with config gpio_config = self.config.gpio if gpio_config.debug: self.gpio = MockGPIO() else: self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin, gpio_config.mag_dist_pos_dir_pin, gpio_config.mag_dist_pos_step_pin, gpio_config.mag_dist_pos_en_pin]) # Initialize vision system self.vision = DataMatrixReader(self.config.vision) # Initialize pump controller self.pump_controller = PumpController(self.config, self.gpio) # Initialize magazine distributor self.mag_distributor = MagDistributor(self.config, self.gpio) i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C self.i2c = I2C(i2c_device_class) self.i2c.initialize() self.feeder_queue: asyncio.Queue[int] = asyncio.Queue(self.config.feeder.max_num_cells) self.defeeder_queue: asyncio.Queue[DefeederMagazineConfig] = asyncio.Queue(self.config.defeeder.max_num_cells) # Initialize robot controller with all required arguments self.controller = RobotController( self.config, self.gpio, self.i2c, self.vision, self.pump_controller, self.feeder_queue, self.defeeder_queue ) self.logger = logging.getLogger(__name__) self.logger.info("Initializing LoaderSystem") # Test stuff self.test_drop_slot = self.config.measurement_devices[0].slots[2] self.test_pickup_slot = self.config.measurement_devices[0].slots[3] # Use mock I2C device if debug is enabled self.i2c.initialize() self.logger.info(f"I2C initialized with {i2c_device_class.__name__}") self.pump_controller = PumpController(self.config, self.gpio) async def run(self): await self.controller.connect() await asyncio.gather( self._loader_loop(), self._poll_i2c_channels() ) async def _poll_i2c_channels(self): while True: try: readings = await self.i2c.read_channels([1, 3, 4]) for channel, value in readings.items(): self.logger.debug(f"Channel {channel} reading: {value}") if channel == 3: # Pressure reading self.pump_controller.handle_tank_reading(value) if channel == 4: state = self.pump_controller.check_endeffector_state(value) self.controller.set_suction_state(state) except Exception as e: self.logger.error(f"Error polling I2C channels: {str(e)}") await asyncio.sleep(1) # Poll every second async def _loader_loop(self): while True: await self.mag_distributor.home() self.mag_distributor.mag_to_feeder() await self.feeder_queue.put(1) await self.controller.prepare_feeder_cell() # await self.controller.pick_cell_from_feeder() # cell = Cell(id=1, status=CellStatus.WAITING) # await self.controller.insert_cell_to_slot(cell, self.test_drop_slot) # await self.controller.pick_cell_from_slot(self.test_pickup_slot) # await self.controller.dropoff_cell() # await wait_for_enter() # # Feeding with MagDistributor # ########################### # self.logger.info("Homing the Magazin Distributor Axis...") # await self.mag_distributor.home() # self.logger.info("Picking up a cell from magazine and placing it into feeder using MagDistributor...") # self.mag_distributor.mag_to_feeder() # await self.feeder_queue.put(1) # self.logger.info("Done.") # await wait_for_enter() # # Prepare Feeder Cell # ############################## # self.logger.info("Preparing feeder cell...") # await self.controller.prepare_feeder_cell() # self.logger.info("Done.") # await wait_for_enter() # # Feeder -> Slot # ############################## # self.logger.info("Picking up a cell from feeder and placing it into slot...") # await self.controller.pick_cell_from_feeder() # cell = Cell(id=1, status=CellStatus.WAITING) # await self.controller.insert_cell_to_slot(cell, self.test_drop_slot) # await wait_for_enter() # # Slot -> Defeeder # ############################## # self.logger.info("Picking up a cell from slot and placing it into defeeder...") # await self.controller.pick_cell_from_slot(self.test_pickup_slot) # await self.controller.dropoff_cell() # self.logger.info("Done.") # await wait_for_enter() # # Defeeding with MagDistributor # ########################### # self.logger.info("Defeeding a cell from feeder to magazine using MagDistributor...") # self.mag_distributor.defeeder_to_mag(self.config.defeeder_magazines[0]) # self.logger.info("Done.") self.logger.info("\nPress Enter to repeat sequence (or Ctrl+C to exit)...") async def cleanup(self): self.logger.info("Cleaning up resources...") await self.controller.cleanup() self.gpio.cleanup() if __name__ == "__main__": loader_system = LoaderSystem() async def shutdown(): loader_system.logger.info("Shutting down...") await loader_system.cleanup() sys.exit(0) def handle_signal(sig, frame): asyncio.get_event_loop().create_task(shutdown()) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) asyncio.run(loader_system.run())