datamatrix.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import cv2
  2. from pylibdmtx.pylibdmtx import decode
  3. import logging
  4. from pathlib import Path
  5. from robot_control.src.utils.config import VisionConfig
  6. from typing import Optional
  7. logger = logging.getLogger(__name__)
  8. try:
  9. from picamera2 import Picamera2
  10. PICAMERA2_AVAILABLE = True
  11. except ImportError:
  12. PICAMERA2_AVAILABLE = False
  13. class TestCamera:
  14. def __init__(self, test_files_path: str):
  15. self.image_files = []
  16. self.current_index = 0
  17. # Load all image files from the test directory
  18. path = Path(test_files_path)
  19. if path.exists() and path.is_dir():
  20. self.image_files = sorted([
  21. str(f) for f in path.glob("*")
  22. if f.suffix.lower() in ['.png', '.jpg', '.jpeg']
  23. ])
  24. if not self.image_files:
  25. logger.warning(f"No image files found in {test_files_path}")
  26. else:
  27. logger.error(f"Test files directory not found: {test_files_path}")
  28. def read(self):
  29. if not self.image_files:
  30. return False, None
  31. # Read next image file in sequence
  32. image_path = self.image_files[self.current_index]
  33. frame = cv2.imread(image_path)
  34. if frame is None:
  35. logger.error(f"Failed to load image: {image_path}")
  36. return False, None
  37. # Cycle through available images
  38. self.current_index = (self.current_index + 1) % len(self.image_files)
  39. return True, frame
  40. def release(self):
  41. pass
  42. class DataMatrixReader:
  43. def __init__(self, config: VisionConfig):
  44. self.camera_id = config.camera_id
  45. self.camera = None
  46. self.bbox = config.bbox
  47. self.picam2 = None
  48. if self.camera_id < 0:
  49. test_path = "./tests/files/"
  50. self.camera = TestCamera(test_path)
  51. else:
  52. if not PICAMERA2_AVAILABLE:
  53. raise RuntimeError("Picamera2 is required for camera operation.")
  54. self.picam2 = Picamera2()
  55. self.picam2.configure(self.picam2.create_preview_configuration(
  56. main={"format": "BGR888", "size": (640, 480)}
  57. ))
  58. self.picam2.start()
  59. def read_datamatrix(self) -> Optional[str]:
  60. if self.camera_id < 0:
  61. # Use TestCamera
  62. ret, frame = self.camera.read()
  63. else:
  64. # Use Picamera2
  65. if not self.picam2:
  66. raise Exception("No cam available")
  67. frame = self.picam2.capture_array()
  68. ret = frame is not None and frame.size != 0
  69. if not ret or frame is None or frame.size == 0:
  70. raise Exception("Failed to capture image")
  71. # DEBUG: added (save cropped bbox frame, overwrites each time)
  72. cv2.imwrite("./captured_frame.png", frame)
  73. if self.bbox:
  74. #x, y, w, h = self.bbox
  75. x, y, w, h = 80, 0, 220, 450 #Added values for bbox here as i couldnt update config
  76. frame = frame[y:y+h, x:x+w]
  77. # Convert to grayscale, apply histogram equalization, blur, and adaptive thresholding
  78. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  79. frame = cv2.equalizeHist(frame)
  80. # DEBUG: added (save cropped bbox frame)
  81. cv2.imwrite("./captured_bbox.png", frame)
  82. # frame = cv2.medianBlur(frame, 3)
  83. # frame = cv2.adaptiveThreshold(frame, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
  84. # cv2.THRESH_BINARY, 21, 2)
  85. # kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
  86. # frame = cv2.morphologyEx(frame, cv2.MORPH_CLOSE, kernel)
  87. decoded = decode(frame, timeout=5000, gap_size=20, max_count=1)
  88. if not decoded:
  89. logger.info("No datamatrix found")
  90. return None
  91. decoded_str = decoded[0].data.decode('utf-8')
  92. return decoded_str
  93. def cleanup(self):
  94. if self.camera_id < 0:
  95. if self.camera:
  96. self.camera.release()
  97. else:
  98. if self.picam2:
  99. self.picam2.stop()