import asyncio import logging from robot_control.src.robot.controller import RobotController from robot_control.src.utils.config import ConfigParser from robot_control.src.vision.datamatrix import DataMatrixReader from robot_control.src.api.i2c_handler import I2C, MockI2C from robot_control.src.vendor.mcp3428 import MCP3428 from robot_control.src.robot.pump_controller import PumpController from robot_control.src.api.gpio import PiGPIO, MockGPIO logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(module)s - %(levelname)s - %(message)s', ) class LoaderSystem: def __init__(self): self.config = ConfigParser().config gpio_config = self.config.gpio if gpio_config.debug: self.gpio = MockGPIO() else: self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin]) self.logger = logging.getLogger(__name__) self.controller = RobotController(self.config, self.gpio) self.vision = DataMatrixReader(self.config.vision) self.logger.info("Initializing LoaderSystem") self.vision.initialize() # Use mock I2C device if debug is enabled i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C self.i2c = I2C(i2c_device_class) self.i2c.initialize() self.logger.info(f"I2C initialized with {i2c_device_class.__name__}") self.pump_controller = PumpController(self.config, self.gpio) self.next_cell_id = "" async def run(self): await self.controller.connect() try: await asyncio.gather( self._loader_loop(), self._poll_i2c_channels() ) finally: self.cleanup() self.logger.info("Cleaning up resources...") async def _poll_i2c_channels(self): while True: try: readings = await self.i2c.read_channels([1, 3, 4]) for channel, value in readings.items(): self.logger.debug(f"Channel {channel} reading: {value}") if channel == 3: # Pressure reading self.pump_controller.handle_tank_reading(value) if channel == 4: state = self.pump_controller.check_endeffector_state(value) self.controller.set_suction_state(state) except Exception as e: self.logger.error(f"Error polling I2C channels: {str(e)}") await asyncio.sleep(1) # Poll every second def check_cell_voltage(self, voltage, cell_id_str = ""): if voltage < abs(self.config.feeder.min_voltage): self.logger.info(f"Cell {cell_id_str} voltage too low, discarding cell") return False if voltage < 0: self.logger.info(f"Cell {cell_id_str} has wrong polarity, discarding cell") return False self.logger.info(f"Cell {cell_id_str} voltage({voltage}) is good") return True async def _prepare_feeder_cell(self) -> None: """ Handles the process of preparing a cell in the feeder. Loops until a suitable cell is found or no cell is detected. Returns the cell_id_str if a cell is prepared, otherwise an empty string. """ io_conf = self.config.gpio while not self.next_cell_id: self.next_cell_id = self.vision.read_datamatrix() if not self.next_cell_id: self.logger.debug("No cell detected") return # No cell detected self.gpio.set_pin(io_conf.probe_pin, 1) await asyncio.sleep(0.1) # Wait for probe to deploy cell_v = await self.i2c.read_channel(1) # Measure cell voltage self.gpio.set_pin(io_conf.probe_pin, 0) self.logger.debug(f"Cell voltage: {cell_v}") if self.check_cell_voltage(cell_v, self.next_cell_id): return # Desired case! # Discard cell directly from feeder self.logger.info(f"Cell {self.next_cell_id} voltage({cell_v}) is bad, discarding cell") self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn await asyncio.sleep(1) # Wait for cell to be ejected try: self.next_cell_id = "" await self.controller.pick_cell_from_feeder() await self.controller.dropoff_cell() except Exception as e: self.logger.error(f"Failed to process cell {self.next_cell_id}: {str(e)}") return async def _loader_loop(self): """ Main loop for the loader system. Checks for free slots and tries to fill them with cells. If no more free slots are available, it checks for completed measurements and sorts the cells accordingly. """ while True: await asyncio.sleep(0.1) # avoid busy loop # Check for free slots loop while True: # Discard cells until acceptable cell is found as next_cell_id if not self.next_cell_id: await self._prepare_feeder_cell() slot = self.controller.get_next_free_slot() if not slot: break # No free slots available if not self.next_cell_id: break # Continue with process_finished_measurement # Pick and place new cell cell_id = int(self.next_cell_id) self.logger.info(f"Processing cell {cell_id}") cell = self.controller.add_cell(cell_id) io_conf = self.config.gpio try: self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn await asyncio.sleep(1) # Wait for cell to be ejected await self.controller.pick_cell_from_feeder() self.feeder_prepared = False await self.controller.insert_cell_to_slot(cell, slot) except Exception as e: self.logger.error(f"Failed to process cell {cell_id}: {str(e)}") break # Check for completed measurements and sort cell await self.controller.process_finished_measurement() def cleanup(self): self.gpio.cleanup() # Ensure PumpController cleans up gpio if __name__ == "__main__": loader_system = LoaderSystem() asyncio.run(loader_system.run())