| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import asyncio
- import logging
- from robot_control.src.robot.controller import RobotController
- from robot_control.src.utils.config import ConfigParser
- from robot_control.src.vision.datamatrix import DataMatrixReader
- from robot_control.src.api.i2c_handler import I2C, MockI2C
- from robot_control.src.vendor.mcp3428 import MCP3428
- from robot_control.src.robot.pump_controller import PumpController
- from robot_control.src.api.gpio import PiGPIO, MockGPIO
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
- )
- class LoaderSystem:
- def __init__(self):
- self.config = ConfigParser().config
- gpio_config = self.config.gpio
- if gpio_config.debug:
- self.gpio = MockGPIO()
- else:
- self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
- self.logger = logging.getLogger(__name__)
- self.controller = RobotController(self.config, self.gpio)
- self.vision = DataMatrixReader(self.config.vision)
- self.logger.info("Initializing LoaderSystem")
- self.vision.initialize()
- # Use mock I2C device if debug is enabled
- i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
- self.i2c = I2C(i2c_device_class)
- self.i2c.initialize()
- self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
- self.pump_controller = PumpController(self.config, self.gpio)
- self.next_cell_id = ""
- async def run(self):
- await self.controller.connect()
- try:
- await asyncio.gather(
- self._loader_loop(),
- self._poll_i2c_channels()
- )
- finally:
- self.cleanup()
- self.logger.info("Cleaning up resources...")
- async def _poll_i2c_channels(self):
- while True:
- try:
- readings = await self.i2c.read_channels([1, 3, 4])
- for channel, value in readings.items():
- self.logger.debug(f"Channel {channel} reading: {value}")
- if channel == 3: # Pressure reading
- self.pump_controller.handle_tank_reading(value)
- if channel == 4:
- state = self.pump_controller.check_endeffector_state(value)
- self.controller.set_suction_state(state)
- except Exception as e:
- self.logger.error(f"Error polling I2C channels: {str(e)}")
- await asyncio.sleep(1) # Poll every second
- def check_cell_voltage(self, voltage, cell_id_str = ""):
- if voltage < abs(self.config.feeder.min_voltage):
- self.logger.info(f"Cell {cell_id_str} voltage too low, discarding cell")
- return False
- if voltage < 0:
- self.logger.info(f"Cell {cell_id_str} has wrong polarity, discarding cell")
- return False
-
- self.logger.info(f"Cell {cell_id_str} voltage({voltage}) is good")
- return True
- async def _prepare_feeder_cell(self) -> None:
- """
- Handles the process of preparing a cell in the feeder. Loops until a suitable cell is found or no cell is detected.
- Returns the cell_id_str if a cell is prepared, otherwise an empty string.
- """
- io_conf = self.config.gpio
- while not self.next_cell_id:
- self.next_cell_id = self.vision.read_datamatrix()
- if not self.next_cell_id:
- self.logger.debug("No cell detected")
- return # No cell detected
- self.gpio.set_pin(io_conf.probe_pin, 1)
- await asyncio.sleep(0.1) # Wait for probe to deploy
- cell_v = await self.i2c.read_channel(1) # Measure cell voltage
- self.gpio.set_pin(io_conf.probe_pin, 0)
- self.logger.debug(f"Cell voltage: {cell_v}")
- if self.check_cell_voltage(cell_v, self.next_cell_id):
- return # Desired case!
- # Discard cell directly from feeder
- self.logger.info(f"Cell {self.next_cell_id} voltage({cell_v}) is bad, discarding cell")
- self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn
- await asyncio.sleep(1) # Wait for cell to be ejected
- try:
- self.next_cell_id = ""
- await self.controller.pick_cell_from_feeder()
- await self.controller.dropoff_cell()
- except Exception as e:
- self.logger.error(f"Failed to process cell {self.next_cell_id}: {str(e)}")
- return
- async def _loader_loop(self):
- """
- Main loop for the loader system.
- Checks for free slots and tries to fill them with cells.
- If no more free slots are available, it checks for completed measurements
- and sorts the cells accordingly.
- """
- while True:
- await asyncio.sleep(0.1) # avoid busy loop
- # Check for free slots loop
- while True:
- # Discard cells until acceptable cell is found as next_cell_id
- if not self.next_cell_id:
- await self._prepare_feeder_cell()
- slot = self.controller.get_next_free_slot()
- if not slot:
- break # No free slots available
- if not self.next_cell_id:
- break # Continue with process_finished_measurement
- # Pick and place new cell
- cell_id = int(self.next_cell_id)
- self.logger.info(f"Processing cell {cell_id}")
- cell = self.controller.add_cell(cell_id)
- io_conf = self.config.gpio
- try:
- self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, 1600, 200) # Exactly half a turn
- await asyncio.sleep(1) # Wait for cell to be ejected
- await self.controller.pick_cell_from_feeder()
- self.feeder_prepared = False
- await self.controller.insert_cell_to_slot(cell, slot)
- except Exception as e:
- self.logger.error(f"Failed to process cell {cell_id}: {str(e)}")
- break
- # Check for completed measurements and sort cell
- await self.controller.process_finished_measurement()
- def cleanup(self):
- self.gpio.cleanup() # Ensure PumpController cleans up gpio
- if __name__ == "__main__":
- loader_system = LoaderSystem()
- asyncio.run(loader_system.run())
|