| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import cv2
- from pylibdmtx.pylibdmtx import decode
- import logging
- from pathlib import Path
- from robot_control.src.utils.config import VisionConfig
- from typing import Optional
- logger = logging.getLogger(__name__)
- try:
- from picamera2 import Picamera2
- PICAMERA2_AVAILABLE = True
- except ImportError:
- PICAMERA2_AVAILABLE = False
- class TestCamera:
- def __init__(self, test_files_path: str):
- self.image_files = []
- self.current_index = 0
-
- # Load all image files from the test directory
- path = Path(test_files_path)
- if path.exists() and path.is_dir():
- self.image_files = sorted([
- str(f) for f in path.glob("*")
- if f.suffix.lower() in ['.png', '.jpg', '.jpeg']
- ])
- if not self.image_files:
- logger.warning(f"No image files found in {test_files_path}")
- else:
- logger.error(f"Test files directory not found: {test_files_path}")
- def read(self):
- if not self.image_files:
- return False, None
-
- # Read next image file in sequence
- image_path = self.image_files[self.current_index]
- frame = cv2.imread(image_path)
- if frame is None:
- logger.error(f"Failed to load image: {image_path}")
- return False, None
-
- # Cycle through available images
- self.current_index = (self.current_index + 1) % len(self.image_files)
- return True, frame
- def release(self):
- pass
- class DataMatrixReader:
- def __init__(self, config: VisionConfig):
- self.camera_id = config.camera_id
- self.camera = None
- self.bbox = config.bbox
- self.picam2 = None
- if self.camera_id < 0:
- test_path = "./tests/files/"
- self.camera = TestCamera(test_path)
- else:
- if not PICAMERA2_AVAILABLE:
- raise RuntimeError("Picamera2 is required for camera operation.")
- self.picam2 = Picamera2()
- self.picam2.configure(self.picam2.create_preview_configuration(
- main={"format": "BGR888", "size": (640, 480)}
- ))
- self.picam2.start()
- def read_datamatrix(self) -> Optional[str]:
- if self.camera_id < 0:
- # Use TestCamera
- ret, frame = self.camera.read()
- else:
- # Use Picamera2
- if not self.picam2:
- raise Exception("No cam available")
- frame = self.picam2.capture_array()
- ret = frame is not None and frame.size != 0
- if not ret or frame is None or frame.size == 0:
- raise Exception("Failed to capture image")
-
- # DEBUG: added (save cropped bbox frame, overwrites each time)
- cv2.imwrite("./captured_frame.png", frame)
- if self.bbox:
- #x, y, w, h = self.bbox
- x, y, w, h = 80, 0, 220, 450 #Added values for bbox here as i couldnt update config
- frame = frame[y:y+h, x:x+w]
- # Convert to grayscale, apply histogram equalization, blur, and adaptive thresholding
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- frame = cv2.equalizeHist(frame)
- # DEBUG: added (save cropped bbox frame)
- cv2.imwrite("./captured_bbox.png", frame)
- # frame = cv2.medianBlur(frame, 3)
- # frame = cv2.adaptiveThreshold(frame, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
- # cv2.THRESH_BINARY, 21, 2)
- # kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
- # frame = cv2.morphologyEx(frame, cv2.MORPH_CLOSE, kernel)
- decoded = decode(frame, timeout=5000, gap_size=20, max_count=1)
- if not decoded:
- logger.info("No datamatrix found")
- return None
-
- decoded_str = decoded[0].data.decode('utf-8')
- return decoded_str
- def cleanup(self):
- if self.camera_id < 0:
- if self.camera:
- self.camera.release()
- else:
- if self.picam2:
- self.picam2.stop()
|