| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import signal
- import sys
- import asyncio
- import logging
- from robot_control.src.robot.controller import RobotController, Cell, CellStatus
- from robot_control.src.utils.config import ConfigParser
- from robot_control.src.utils.logging import setup_logging
- from robot_control.src.vision.datamatrix import DataMatrixReader
- from robot_control.src.api.i2c_handler import I2C, MockI2C
- from robot_control.src.vendor.mcp3428 import MCP3428
- from robot_control.src.robot.pump_controller import PumpController
- from robot_control.src.api.gpio import PiGPIO, MockGPIO
- """
- This is a test script for the loader system.
- It initializes the robot controller, vision system, and I2C handler,
- """
- async def wait_for_enter():
- # Use asyncio.Event to coordinate between input and async code
- event = asyncio.Event()
-
- def input_callback():
- logging.info("Press Enter to continue...")
- input() # Wait for Enter
- event.set()
-
- # Run input in a separate thread to not block the event loop
- loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, input_callback)
- await event.wait()
- class LoaderSystem:
- def __init__(self):
- self.config = ConfigParser().config
- setup_logging(self.config) # Set up logging with config
- gpio_config = self.config.gpio
- if gpio_config.debug:
- self.gpio = MockGPIO()
- else:
- self.gpio = PiGPIO(out_pins=[gpio_config.pump_pin, gpio_config.valve_pin])
- # Initialize vision system
- self.vision = DataMatrixReader(self.config.vision)
-
- # Initialize pump controller
- self.pump_controller = PumpController(self.config, self.gpio)
- i2c_device_class = MCP3428 if not self.config.i2c.debug else MockI2C
- self.i2c = I2C(i2c_device_class)
- self.feeder_queue: asyncio.Queue[int] = asyncio.Queue(self.config.feeder.max_capacity)
- self.defeeder_queue: asyncio.Queue[int] = asyncio.Queue(self.config.defeeder.max_capacity)
- # Initialize robot controller with all required arguments
- self.controller = RobotController(
- self.config,
- self.gpio,
- self.i2c,
- self.vision,
- self.pump_controller,
- self.feeder_queue,
- self.defeeder_queue
- )
-
- self.logger = logging.getLogger(__name__)
- self.logger.info("Initializing LoaderSystem")
- self.vision.initialize()
- # Test stuff
- self.test_slot = self.config.measurement_devices[0].slots[3]
- # Use mock I2C device if debug is enabled
- self.i2c.initialize()
- self.logger.info(f"I2C initialized with {i2c_device_class.__name__}")
- self.pump_controller = PumpController(self.config, self.gpio)
- async def run(self):
- await self.controller.connect()
- await asyncio.gather(
- self._loader_loop(),
- self._poll_i2c_channels()
- )
- async def _poll_i2c_channels(self):
- while True:
- try:
- readings = await self.i2c.read_channels([1, 3, 4])
- for channel, value in readings.items():
- self.logger.debug(f"Channel {channel} reading: {value}")
- if channel == 3: # Pressure reading
- self.pump_controller.handle_tank_reading(value)
- if channel == 4:
- state = self.pump_controller.check_endeffector_state(value)
- self.controller.set_suction_state(state)
- except Exception as e:
- self.logger.error(f"Error polling I2C channels: {str(e)}")
- await asyncio.sleep(1) # Poll every second
- async def _loader_loop(self):
- while True:
- await wait_for_enter()
- await self.controller.prepare_feeder_cell()
- await wait_for_enter()
- self.logger.info("Starting movement sequence...")
- await self.controller.pick_cell_from_slot(self.test_slot)
- await self.controller.dropoff_cell()
- await self.controller.pick_cell_from_feeder()
- cell = Cell(id=1, status=CellStatus.WAITING)
- await self.controller.insert_cell_to_slot(cell, self.test_slot)
- self.logger.info("\nPress Enter to repeat sequence (or Ctrl+C to exit)...")
- async def cleanup(self):
- self.logger.info("Cleaning up resources...")
- await self.controller.cleanup()
- self.gpio.cleanup()
- if __name__ == "__main__":
- loader_system = LoaderSystem()
- async def shutdown():
- loader_system.logger.info("Shutting down...")
- await loader_system.cleanup()
- sys.exit(0)
- def handle_signal(sig, frame):
- asyncio.get_event_loop().create_task(shutdown())
- signal.signal(signal.SIGINT, handle_signal)
- signal.signal(signal.SIGTERM, handle_signal)
- asyncio.run(loader_system.run())
|