|
|
@@ -7,9 +7,13 @@ import logging
|
|
|
from robot_control.src.api.mqtt_handler import MQTTHandler, MeasurementResult
|
|
|
from robot_control.src.api.grbl_handler import GRBLHandler
|
|
|
from robot_control.src.api.gpio import GPIOInterface
|
|
|
+import asyncio
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
+HALF_TURN_STEPS = 1600 # Number of steps for half a turn
|
|
|
+SMOOTH_STEP_DELAY = 200 # Delay in microseconds for smooth stepping
|
|
|
+
|
|
|
class CellStatus(Enum):
|
|
|
WAITING = "WAITING"
|
|
|
MEASURING = "MEASURING"
|
|
|
@@ -23,12 +27,17 @@ class Cell:
|
|
|
capacity: float = 0
|
|
|
|
|
|
class RobotController:
|
|
|
- def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface):
|
|
|
+ def __init__(self, config: RobotConfig, gpio_handler: GPIOInterface, i2c, vision, pump_controller):
|
|
|
self.config = config
|
|
|
self.cells: dict[int, Cell] = {}
|
|
|
self.devices = self.config.measurement_devices
|
|
|
self.feeder = self.config.feeder
|
|
|
self.dropoff_grades = self.config.dropoff_grades
|
|
|
+ self.gpio = gpio_handler
|
|
|
+ self.i2c = i2c
|
|
|
+ self.vision = vision
|
|
|
+ self.pump_controller = pump_controller
|
|
|
+ self.next_cell_id = ""
|
|
|
|
|
|
# Initialize robot movement
|
|
|
self.grbl_handler = GRBLHandler(
|
|
|
@@ -37,7 +46,6 @@ class RobotController:
|
|
|
)
|
|
|
self.movement = RobotMovement(self.config.movement, self.grbl_handler)
|
|
|
|
|
|
- self.gpio = gpio_handler
|
|
|
self.valve_pin = self.config.gpio.valve_pin
|
|
|
|
|
|
# Initialize with configured values
|
|
|
@@ -337,4 +345,70 @@ class RobotController:
|
|
|
# Move back to safe height
|
|
|
logger.info(f"Moving to dropoff position (safe) {safe_pos}...")
|
|
|
await self.movement.move_to_position(*safe_pos)
|
|
|
- logger.info(f"Cell dropped off at grade {dropoff_grade.id}")
|
|
|
+ logger.info(f"Cell dropped off at grade {dropoff_grade.id}")
|
|
|
+
|
|
|
+ def check_cell_voltage(self, voltage, cell_id_str = ""):
|
|
|
+ if voltage < abs(self.config.feeder.min_voltage):
|
|
|
+ logger.info(f"Cell {cell_id_str} voltage too low, discarding cell")
|
|
|
+ return False
|
|
|
+ if voltage < 0:
|
|
|
+ logger.info(f"Cell {cell_id_str} has wrong polarity, discarding cell")
|
|
|
+ return False
|
|
|
+ logger.info(f"Cell {cell_id_str} voltage({voltage}) is good")
|
|
|
+ return True
|
|
|
+
|
|
|
+ async def prepare_feeder_cell(self) -> bool:
|
|
|
+ """
|
|
|
+ Handles the process of preparing a cell in the feeder. Loops until a suitable cell is found or no cell is detected.
|
|
|
+ Returns True if a cell is prepared, otherwise False.
|
|
|
+ """
|
|
|
+ io_conf = self.config.gpio
|
|
|
+ while not self.next_cell_id:
|
|
|
+ self.next_cell_id = self.vision.read_datamatrix()
|
|
|
+ if not self.next_cell_id:
|
|
|
+ logger.debug("No cell detected")
|
|
|
+ return False # No cell detected
|
|
|
+ self.gpio.set_pin(io_conf.probe_pin, 1)
|
|
|
+ await asyncio.sleep(0.1) # Wait for probe to deploy
|
|
|
+ cell_v = await self.i2c.read_channel(1) # Measure cell voltage
|
|
|
+ self.gpio.set_pin(io_conf.probe_pin, 0)
|
|
|
+ logger.debug(f"Cell voltage: {cell_v}")
|
|
|
+ if self.check_cell_voltage(cell_v, self.next_cell_id):
|
|
|
+ return True # Desired case!
|
|
|
+ # Discard cell directly from feeder
|
|
|
+ logger.info(f"Cell {self.next_cell_id} voltage({cell_v}) is bad, discarding cell")
|
|
|
+ self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, HALF_TURN_STEPS, SMOOTH_STEP_DELAY)
|
|
|
+ await asyncio.sleep(1) # Wait for cell to be ejected
|
|
|
+ try:
|
|
|
+ self.next_cell_id = ""
|
|
|
+ await self.pick_cell_from_feeder()
|
|
|
+ await self.dropoff_cell()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to process cell {self.next_cell_id}: {str(e)}")
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ async def fill_next_free_slot(self) -> bool:
|
|
|
+ """
|
|
|
+ Picks and places a new cell into the next free slot. Returns True if a cell was placed, False otherwise.
|
|
|
+ """
|
|
|
+ slot = self.get_next_free_slot()
|
|
|
+ if not slot:
|
|
|
+ return False # No free slots available
|
|
|
+ if not self.next_cell_id:
|
|
|
+ return False # No cell prepared
|
|
|
+ cell_id = int(self.next_cell_id)
|
|
|
+ logger.info(f"Processing cell {cell_id}")
|
|
|
+ cell = self.add_cell(cell_id)
|
|
|
+ io_conf = self.config.gpio
|
|
|
+ try:
|
|
|
+ self.gpio.do_step(io_conf.measure_dir_pin, io_conf.measure_step_pin, HALF_TURN_STEPS, SMOOTH_STEP_DELAY)
|
|
|
+ await asyncio.sleep(1) # Wait for cell to be ejected
|
|
|
+ await self.pick_cell_from_feeder()
|
|
|
+ self.feeder_prepared = False
|
|
|
+ await self.insert_cell_to_slot(cell, slot)
|
|
|
+ self.next_cell_id = "" # Reset after successful placement
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to process cell {cell_id}: {str(e)}")
|
|
|
+ return False
|