| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import asyncio
- from robot_control.src.utils.logging import LoggerSingleton
- from robot_control.src.robot.controller import RobotController
- from robot_control.src.utils.config import ConfigParser
- from robot_control.src.vision.datamatrix import DataMatrixReader
- from robot_control.src.api.i2c_handler import I2C
- from robot_control.src.vendor.mcp3428 import MCP3428
- from robot_control.src.robot.pump_controller import PumpController
- class LoaderSystem:
- def __init__(self):
- self.config = ConfigParser()
- self.logger = LoggerSingleton.get_logger(self.config)
- self.controller = RobotController(self.config)
- cam = self.config.get_vision_config().get("camera_id")
- self.vision = DataMatrixReader(cam)
- self.logger.info("Initializing LoaderSystem")
- self.vision.initialize()
- self.i2c = I2C(MCP3428)
- self.i2c.initialize()
- self.logger.info("I2C initialized for MCP3428")
- self.pump_controller = PumpController(self.config, self.logger)
- async def run(self):
- await self.controller.connect()
- await asyncio.gather(
- self._loader_loop(),
- self._poll_i2c_channels()
- )
- async def _poll_i2c_channels(self):
- while True:
- try:
- readings = await self.i2c.read_channels([1, 3, 4])
- for channel, value in readings.items():
- self.logger.info(f"Channel {channel} reading: {value}")
- if channel == 3: # Pressure reading
- self.pump_controller.handle_tank_reading(value)
- if channel == 4:
- state = self.pump_controller.check_endeffector_state(value)
- self.controller.set_suction_state(state)
- # Perform a generic task based on the reading
- if value > 0.5: # Example condition
- self.logger.warning(f"Value {value} on channel {channel} exceeds threshold")
- except Exception as e:
- self.logger.error(f"Error polling I2C channels: {str(e)}")
- await asyncio.sleep(1) # Poll every second
- async def _loader_loop(self):
- while True:
- await asyncio.sleep(0.1) # avoid busy loop
- # Check for free slots loop
- while True:
- slot = self.controller.get_next_free_slot()
- if not slot:
- break
- # Pick and place new cell
- cell_id = self.vision.read_datamatrix()
- if not cell_id:
- self.logger.debug("No cell detected")
- break
- self.logger.info(f"Processing cell {cell_id}")
- cell = self.controller.add_cell(cell_id)
- try:
- await self.controller.pick_cell_from_feeder()
- await self.controller.insert_cell_to_slot(cell, slot)
- except Exception as e:
- self.logger.error(f"Failed to process cell {cell_id}: {str(e)}")
- break
- # Check for completed measurements and sort cell
- await self.controller.process_finished_measurement()
- def __del__(self):
- self.pump_controller.cleanup() # Ensure PumpController cleans up gpio
- if __name__ == "__main__":
- loader_system = LoaderSystem()
- asyncio.run(loader_system.run())
|