Преглед на файлове

Add initial project structure with configuration, and control logic

Silas Gruen преди 11 месеца
родител
ревизия
ca651d637a

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+*.cpython-39.pyc

+ 8 - 0
cell_loader_ctr.code-workspace

@@ -0,0 +1,8 @@
+{
+	"folders": [
+		{
+			"path": "."
+		}
+	],
+	"settings": {}
+}

+ 48 - 0
robot-control/README.md

@@ -0,0 +1,48 @@
+# Robot Control Project
+
+This project implements a 2D portal robot capable of picking battery cells, reading data matrices, managing measurement requests via a REST API, and sorting cells based on measurement results.
+
+## Project Structure
+
+- `src/`: Contains the main application code.
+  - `api/`: Handles the REST API for managing robot tasks.
+  - `robot/`: Contains the robot's control logic and movement.
+  - `vision/`: Manages the vision-related functionalities, including data matrix reading.
+  - `main.py`: The entry point of the application.
+
+- `tests/`: Contains unit tests for the application modules.
+
+- `requirements.txt`: Lists the dependencies required for the project.
+
+## Features
+
+- REST API for starting measurements and managing robot tasks.
+- Robot control for picking, inserting, and sorting battery cells.
+- Navigation and movement logic for the robot.
+- Vision capabilities for reading data matrices from battery cells.
+
+## Installation
+
+1. Clone the repository.
+2. Navigate to the project directory.
+3. Install the required dependencies:
+
+   ```
+   pip install -r requirements.txt
+   ```
+
+## Usage
+
+Run the application using:
+
+```
+python src/main.py
+```
+
+## Testing
+
+To run the tests, use:
+
+```
+pytest
+```

+ 36 - 0
robot-control/config/config.yaml

@@ -0,0 +1,36 @@
+measurement_devices:
+  - id: "device_1"
+    position: [100.0, 200.0, 50.0]
+    slots:
+      - position: [10.0, 20.0, 50.0]
+        occupied: False
+      - position: [20.0, 20.0, 50.0]
+        occupied: False
+      - position: [30.0, 20.0, 50.0]
+        occupied: False
+
+  - id: "device_2"
+    position: [100.0, 300.0, 50.0]
+    slots:
+      - position: [10.0, 20.0, 50.0]
+        occupied: False
+      - position: [20.0, 20.0, 50.0]
+        occupied: False
+      - position: [30.0, 20.0, 50.0]
+        occupied: False
+
+feeder:
+  position: [0.0, 0.0, 50.0]
+  approach_position: [0.0, 0.0, 100.0]
+
+dropoff_grades:
+  accepted:
+    position: [500.0, 0.0, 50.0]
+    capacity_threshold: 0.8
+  rejected:
+    position: [500.0, 100.0, 50.0]
+    capacity_threshold: 0
+
+system_settings:
+  speed: 100.0
+  acceleration: 500.0

+ 7 - 0
robot-control/requirements.txt

@@ -0,0 +1,7 @@
+fastapi[all]
+uvicorn
+opencv-python
+pylibdmtx
+pydantic
+numpy
+PyYAML==6.0.1

+ 0 - 0
robot-control/src/api/__init__.py


+ 23 - 0
robot-control/src/api/routes.py

@@ -0,0 +1,23 @@
+from fastapi import FastAPI, HTTPException
+from typing import Dict
+from pydantic import BaseModel
+
+app = FastAPI()
+
+# TODO[SG]: add real API. THis is just a placeholder
+
+class MeasurementResult(BaseModel):
+    cell_id: str
+    slot: int
+    capacity: float
+    status: str
+
+@app.post("/measurement/start")
+async def start_measurement(slot: int, cell_id: str):
+    # Implementation to start measurement
+    return {"status": "started", "slot": slot, "cell_id": cell_id}
+
+@app.post("/measurement/complete")
+async def complete_measurement(result: MeasurementResult):
+    # Add completed measurement to robot's work queue
+    return {"status": "queued", "slot": result.slot}

+ 37 - 0
robot-control/src/main.py

@@ -0,0 +1,37 @@
+import asyncio
+from fastapi import FastAPI
+from robot.controller import RobotController, Cell
+from api.routes import app as api_app
+from vision.datamatrix import DataMatrixReader
+
+class LoaderSystem:
+    def __init__(self):
+        self.controller = RobotController()
+        self.vision = DataMatrixReader()
+        self.api = api_app
+
+    async def run(self):
+        while True:
+            # Main robot control loop
+            while True:
+                slot = self.controller.get_next_free_slot()
+                if not slot:
+                    break
+                # Pick and place new cell
+                await self.controller.pick_cell_from_feeder()
+                cell_id = await self.controller.read_cell_id()
+                cell = Cell(cell_id)
+                await self.controller.insert_cell_to_slot(cell, slot)
+            
+            # Check for completed measurements
+            if self.controller.work_queue:
+                completed_slot = self.controller.work_queue.pop(0)
+                cell_id = await self.controller.collect_cell_from_slot(completed_slot)
+                cell = Cell(cell_id)
+                await self.controller.sort_cell(cell)
+
+            await asyncio.sleep(0.1)
+
+if __name__ == "__main__":
+    loader_system = LoaderSystem()
+    asyncio.run(loader_system.run())

+ 0 - 0
robot-control/src/robot/__init__.py


+ 81 - 0
robot-control/src/robot/config.py

@@ -0,0 +1,81 @@
+from dataclasses import dataclass
+from typing import List, Dict, Tuple
+import yaml
+from pathlib import Path
+
+@dataclass
+class SlotConfig:
+    position: Tuple[float, float, float]
+    occupied: bool
+    cell_id: str = None
+
+@dataclass
+class DeviceConfig:
+    id: str
+    position: Tuple[float, float, float]
+    slots: List[SlotConfig]
+
+@dataclass
+class FeederConfig:
+    position: Tuple[float, float, float]
+    capacity_threshold: float
+
+@dataclass
+class DropoffGradeConfig:
+    position: Tuple[float, float, float]
+    capacity_threshold: float
+
+@dataclass
+class SystemConfig:
+    speed: float
+    acceleration: float
+
+class ConfigParser:
+    def __init__(self, config_path: str = "config/config.yaml"):
+        self.config_path = Path(config_path)
+        self.config = self._load_config()
+
+    def _load_config(self):
+        with open(self.config_path, 'r') as f:
+            return yaml.safe_load(f)
+
+    def get_devices(self) -> List[DeviceConfig]:
+        devices = []
+        for device in self.config['measurement_devices']:
+            slots = [
+                SlotConfig(
+                    position=tuple(slot['position']),
+                    occupied=slot['occupied'],
+                    cell_id=slot['cell_id'] if slot['occupied'] else None,
+                )
+                for slot in device['slots']
+            ]
+            devices.append(DeviceConfig(
+                id=device['id'],
+                position=tuple(device['position']),
+                slots=slots
+            ))
+        return devices
+
+    def get_feeder(self) -> FeederConfig:
+        feeder = self.config['feeder']
+        return FeederConfig(
+            position=tuple(feeder['position']),
+            approach_position=tuple(feeder['approach_position'])
+        )
+
+    def get_dropoff_grades(self) -> Dict[str, DropoffGradeConfig]:
+        grades = {}
+        for name, grade in self.config['dropoff_grades'].items():
+            grades[name] = DropoffGradeConfig(
+                position=tuple(grade['position']),
+                capacity_threshold=grade['capacity_threshold']
+            )
+        return grades
+
+    def get_system_settings(self) -> SystemConfig:
+        settings = self.config['robot_settings']
+        return SystemConfig(
+            speed=settings['speed'],
+            acceleration=settings['acceleration']
+        )

+ 109 - 0
robot-control/src/robot/controller.py

@@ -0,0 +1,109 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import List, Tuple
+from .config import ConfigParser, SlotConfig
+import logging
+
+logger = logging.getLogger(__name__)
+
+class CellStatus(Enum):
+    WAITING = "waiting"
+    MEASURING = "measuring"
+    COMPLETED = "completed"
+    FAILED = "failed"
+
+@dataclass
+class Cell:
+    id: str
+    status: CellStatus
+    measurement_slot: int = None
+    capacity: float = None
+
+class DropoffGrade:
+    def __init__(self, name: str, x: float, y: float, z: float):
+        self.name = name
+        self.position = (x, y, z)
+
+class RobotController:
+    def __init__(self):
+        self.config = ConfigParser()
+        self.devices = self.config.get_devices()
+        self.feeder = self.config.get_feeder()
+        self.dropoff_grades = self.config.get_dropoff_grades()
+        self.rejected_grade = self.dropoff_grades['rejected']
+        self.system_settings = self.config.get_system_settings()
+        
+        # Initialize with configured values
+        self.total_slots = sum(len(device.slots) for device in self.devices)
+        self.work_queue: List[SlotConfig] = []
+
+        self.gripper_occupied = False
+
+    async def pick_cell_from_feeder(self):
+        # Implementation for picking cell from feeder
+        logger.info("Picking cell from feeder")
+        self.gripper_occupied = True
+        pass
+
+    async def read_cell_id(self):
+        # Use vision system to read data matrix and return cell ID
+        return None
+
+    def get_next_free_slot(self):
+        for device in self.devices:
+            for slot in device.slots:
+                if not slot.occupied:
+                    logger.info(f"Next free slot found at position {slot.position}")
+                    return slot
+                
+        logger.error("No free slots available")
+        return None
+
+    async def insert_cell_to_next_available(self, cell: Cell):
+        if not self.gripper_occupied:
+            logger.error("Gripper not occupied")
+            return
+        slot = self.get_next_free_slot()
+        if slot:
+            await self.insert_cell_to_slot(cell, slot)
+        else:
+            logger.error("No free slots available")
+
+    async def insert_cell_to_slot(self, cell: Cell, slot: SlotConfig):
+        if slot.occupied:
+            logger.error(f"Slot {slot.id} is already occupied")
+            return
+        if not self.gripper_occupied:
+            logger.error("Gripper not occupied")
+            return
+        slot.occupied = True
+        slot.cell_id = cell
+        logger.info(f"Cell {cell.id} inserted to slot at position {slot.position}")
+        # Move to slot and insert cell
+        pass
+
+    async def collect_cell_from_slot(self, slot: SlotConfig):
+        if self.gripper_occupied:
+            logger.error("Gripper already occupied")
+            return
+        # Collect cell from measurement slot
+        slot.occupied = False
+        slot.cell_id = None
+        logger.info(f"Cell {slot.cell_id} collected from slot at position {slot.position}")
+        return slot.cell_id
+
+    async def sort_cell(self, cell: Cell):
+        if cell.status == CellStatus.FAILED:
+            await self.dropoff_cell(cell.measurement_slot, self.rejected_grade)
+            logger.info(f"Cell {cell.id} sorted to rejected grade")
+            return
+        for name, grade in self.dropoff_grades.items():
+            if cell.capacity >= grade.capacity_threshold:
+                await self.dropoff_cell(cell, grade)
+                logger.info(f"Cell {cell.id} sorted to grade {name}")
+                return
+        logger.error(f"No suitable grade found for cell {cell.id} with capacity {cell.capacity}")
+
+    async def dropoff_cell(self, dropoff_grade: DropoffGrade):
+        # Drop collected cell at position
+        pass

+ 58 - 0
robot-control/src/robot/movement.py

@@ -0,0 +1,58 @@
+from typing import Tuple, Optional
+import time
+
+# TODO[SG]: Implement actual robot movement commands. This is just a placeholder.
+
+class RobotMovement:
+    def __init__(self):
+        self.current_position: Tuple[float, float, float] = (0.0, 0.0, 0.0)  # x, y, z
+        self.speed: float = 100.0  # Default speed in mm/s
+        self.is_moving: bool = False
+        self.home_position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
+
+    def set_speed(self, speed: float) -> None:
+        """Set the movement speed in mm/s."""
+        if speed <= 0:
+            raise ValueError("Speed must be positive")
+        self.speed = speed
+
+    def move_to_position(self, x: float, y: float, z: float) -> bool:
+        """Move to absolute position. Returns True if movement successful."""
+        try:
+            self.is_moving = True
+            # Here you would implement actual robot movement commands
+            self.current_position = (x, y, z)
+            self.is_moving = False
+            return True
+        except Exception as e:
+            self.is_moving = False
+            raise RuntimeError(f"Movement failed: {str(e)}")
+
+    def move_relative(self, dx: float, dy: float, dz: float) -> bool:
+        """Move relative to current position."""
+        new_x = self.current_position[0] + dx
+        new_y = self.current_position[1] + dy
+        new_z = self.current_position[2] + dz
+        return self.move_to_position(new_x, new_y, new_z)
+
+    def home(self) -> bool:
+        """Move to home position."""
+        return self.move_to_position(*self.home_position)
+
+    def get_position(self) -> Tuple[float, float, float]:
+        """Get current position."""
+        return self.current_position
+
+    def stop(self) -> None:
+        """Emergency stop."""
+        self.is_moving = False
+        # Implement emergency stop command here
+
+    def wait_until_idle(self, timeout: Optional[float] = None) -> bool:
+        """Wait until movement is complete or timeout (in seconds) is reached."""
+        start_time = time.time()
+        while self.is_moving:
+            if timeout and (time.time() - start_time) > timeout:
+                return False
+            time.sleep(0.1)
+        return True

+ 0 - 0
robot-control/src/vision/__init__.py


+ 27 - 0
robot-control/src/vision/datamatrix.py

@@ -0,0 +1,27 @@
+import cv2
+from pylibdmtx.pylibdmtx import decode
+
+# TODO[SG]: add real datamatrix code. This is just a placeholder.
+
+class DataMatrixReader:
+    def __init__(self, camera_id: int = 0):
+        self.camera_id = camera_id
+        self.camera = None
+
+    def initialize(self):
+        self.camera = cv2.VideoCapture(self.camera_id)
+
+    def read_datamatrix(self) -> str:
+        ret, frame = self.camera.read()
+        if not ret:
+            raise Exception("Failed to capture image")
+        
+        decoded = decode(frame)
+        if not decoded:
+            raise Exception("No data matrix found")
+            
+        return decoded[0].data.decode('utf-8')
+
+    def cleanup(self):
+        if self.camera:
+            self.camera.release()