import cv2 from pylibdmtx.pylibdmtx import decode import logging from pathlib import Path from robot_control.src.utils.config import VisionConfig from typing import Optional logger = logging.getLogger(__name__) try: from picamera2 import Picamera2 PICAMERA2_AVAILABLE = True except ImportError: PICAMERA2_AVAILABLE = False class TestCamera: def __init__(self, test_files_path: str): self.image_files = [] self.current_index = 0 # Load all image files from the test directory path = Path(test_files_path) if path.exists() and path.is_dir(): self.image_files = sorted([ str(f) for f in path.glob("*") if f.suffix.lower() in ['.png', '.jpg', '.jpeg'] ]) if not self.image_files: logger.warning(f"No image files found in {test_files_path}") else: logger.error(f"Test files directory not found: {test_files_path}") def read(self): if not self.image_files: return False, None # Read next image file in sequence image_path = self.image_files[self.current_index] frame = cv2.imread(image_path) if frame is None: logger.error(f"Failed to load image: {image_path}") return False, None # Cycle through available images self.current_index = (self.current_index + 1) % len(self.image_files) return True, frame def release(self): pass class DataMatrixReader: def __init__(self, config: VisionConfig): self.camera_id = config.camera_id self.camera = None self.bbox = config.bbox self.picam2 = None if self.camera_id < 0: test_path = "./tests/files/" self.camera = TestCamera(test_path) else: if not PICAMERA2_AVAILABLE: raise RuntimeError("Picamera2 is required for camera operation.") self.picam2 = Picamera2() self.picam2.configure(self.picam2.create_preview_configuration( main={"format": "BGR888", "size": (640, 480)} )) self.picam2.start() def read_datamatrix(self) -> Optional[str]: if self.camera_id < 0: # Use TestCamera ret, frame = self.camera.read() else: # Use Picamera2 if not self.picam2: raise Exception("No cam available") frame = self.picam2.capture_array() ret = frame is not None and frame.size != 0 if not ret or frame is None or frame.size == 0: raise Exception("Failed to capture image") # DEBUG: added (save cropped bbox frame, overwrites each time) cv2.imwrite("./captured_frame.png", frame) if self.bbox: #x, y, w, h = self.bbox x, y, w, h = 80, 0, 220, 450 #Added values for bbox here as i couldnt update config frame = frame[y:y+h, x:x+w] # Convert to grayscale, apply histogram equalization, blur, and adaptive thresholding frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame = cv2.equalizeHist(frame) # DEBUG: added (save cropped bbox frame) cv2.imwrite("./captured_bbox.png", frame) # frame = cv2.medianBlur(frame, 3) # frame = cv2.adaptiveThreshold(frame, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, # cv2.THRESH_BINARY, 21, 2) # kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3)) # frame = cv2.morphologyEx(frame, cv2.MORPH_CLOSE, kernel) decoded = decode(frame, timeout=5000, gap_size=20, max_count=1) if not decoded: logger.info("No datamatrix found") return None decoded_str = decoded[0].data.decode('utf-8') return decoded_str def cleanup(self): if self.camera_id < 0: if self.camera: self.camera.release() else: if self.picam2: self.picam2.stop()