diff --git a/engine/pipeline/preprocessing.py b/engine/pipeline/preprocessing.py new file mode 100644 index 0000000..fddc037 --- /dev/null +++ b/engine/pipeline/preprocessing.py @@ -0,0 +1,130 @@ +"""OpenCV preprocessing pipeline for raster-to-vector conversion.""" + +import cv2 +import numpy as np + + +def decode_image(raw_bytes: bytes) -> np.ndarray: + """Decode raw image bytes into a BGR numpy array.""" + buf = np.frombuffer(raw_bytes, dtype=np.uint8) + img = cv2.imdecode(buf, cv2.IMREAD_COLOR) + if img is None: + raise ValueError("Failed to decode image from provided bytes") + return img + + +def to_grayscale(img: np.ndarray) -> np.ndarray: + """Convert BGR image to single-channel grayscale.""" + if len(img.shape) == 2: + return img + return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + +def denoise( + img: np.ndarray, + d: int = 9, + sigma_color: float = 75.0, + sigma_space: float = 75.0, +) -> np.ndarray: + """Apply bilateral filter for edge-preserving denoising.""" + return cv2.bilateralFilter(img, d, sigma_color, sigma_space) + + +def enhance_contrast( + img: np.ndarray, + clip_limit: float = 2.0, + tile_grid_size: tuple[int, int] = (8, 8), +) -> np.ndarray: + """Apply CLAHE contrast enhancement.""" + clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size) + return clahe.apply(img) + + +def threshold( + img: np.ndarray, + manual_thresh: int | None = None, +) -> np.ndarray: + """Apply thresholding — Otsu auto by default, manual override if provided.""" + if manual_thresh is not None: + _, result = cv2.threshold(img, manual_thresh, 255, cv2.THRESH_BINARY) + else: + _, result = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + return result + + +def edge_detect( + img: np.ndarray, + low: int = 50, + high: int = 150, +) -> np.ndarray: + """Apply Canny edge detection.""" + return cv2.Canny(img, low, high) + + +def morphological_ops( + img: np.ndarray, + kernel_size: int = 3, + dilate_iterations: int = 1, + erode_iterations: int = 1, +) -> np.ndarray: + """Apply dilation then erosion (closing-style) to clean up binary image.""" + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size)) + result = cv2.dilate(img, kernel, iterations=dilate_iterations) + result = cv2.erode(result, kernel, iterations=erode_iterations) + return result + + +def preprocess( + raw_bytes: bytes, + params: dict | None = None, +) -> np.ndarray: + """Run the full preprocessing pipeline on raw image bytes. + + Stages: decode → grayscale → denoise → contrast → threshold → morphological ops. + Edge detection is optional (enabled via params["edge_detect"] = True). + + All stage parameters are tunable via the params dict. Keys: + denoise_d, denoise_sigma_color, denoise_sigma_space, + clahe_clip_limit, clahe_tile_grid_size, + threshold_manual, + edge_detect (bool), edge_low, edge_high, + morph_kernel_size, morph_dilate_iterations, morph_erode_iterations + """ + p = params or {} + + img = decode_image(raw_bytes) + img = to_grayscale(img) + + img = denoise( + img, + d=p.get("denoise_d", 9), + sigma_color=p.get("denoise_sigma_color", 75.0), + sigma_space=p.get("denoise_sigma_space", 75.0), + ) + + img = enhance_contrast( + img, + clip_limit=p.get("clahe_clip_limit", 2.0), + tile_grid_size=p.get("clahe_tile_grid_size", (8, 8)), + ) + + img = threshold( + img, + manual_thresh=p.get("threshold_manual"), + ) + + if p.get("edge_detect", False): + img = edge_detect( + img, + low=p.get("edge_low", 50), + high=p.get("edge_high", 150), + ) + + img = morphological_ops( + img, + kernel_size=p.get("morph_kernel_size", 3), + dilate_iterations=p.get("morph_dilate_iterations", 1), + erode_iterations=p.get("morph_erode_iterations", 1), + ) + + return img diff --git a/engine/tests/test_preprocessing.py b/engine/tests/test_preprocessing.py new file mode 100644 index 0000000..366d33b --- /dev/null +++ b/engine/tests/test_preprocessing.py @@ -0,0 +1,232 @@ +"""Tests for the OpenCV preprocessing pipeline.""" + +import cv2 +import numpy as np +import pytest + +from pipeline.preprocessing import ( + decode_image, + denoise, + edge_detect, + enhance_contrast, + morphological_ops, + preprocess, + threshold, + to_grayscale, +) + + +def _make_test_image(width: int = 100, height: int = 80, color: bool = True) -> np.ndarray: + """Create a synthetic test image with some structure.""" + if color: + img = np.zeros((height, width, 3), dtype=np.uint8) + # White rectangle on black background + cv2.rectangle(img, (20, 15), (80, 65), (255, 255, 255), -1) + # Gray circle for tonal variation + cv2.circle(img, (50, 40), 15, (128, 128, 128), -1) + else: + img = np.zeros((height, width), dtype=np.uint8) + cv2.rectangle(img, (20, 15), (80, 65), 255, -1) + cv2.circle(img, (50, 40), 15, 128, -1) + return img + + +def _encode_png(img: np.ndarray) -> bytes: + """Encode numpy array to PNG bytes.""" + ok, buf = cv2.imencode(".png", img) + assert ok + return buf.tobytes() + + +# --- decode_image --- + +class TestDecodeImage: + def test_decodes_valid_png(self): + img = _make_test_image() + raw = _encode_png(img) + result = decode_image(raw) + assert result.shape == img.shape + assert result.dtype == np.uint8 + + def test_rejects_invalid_bytes(self): + with pytest.raises(ValueError, match="Failed to decode"): + decode_image(b"not an image") + + def test_decodes_jpeg(self): + img = _make_test_image() + ok, buf = cv2.imencode(".jpg", img) + assert ok + result = decode_image(buf.tobytes()) + assert result.shape == img.shape + + +# --- to_grayscale --- + +class TestToGrayscale: + def test_converts_color_to_gray(self): + img = _make_test_image(color=True) + result = to_grayscale(img) + assert len(result.shape) == 2 + assert result.shape == (80, 100) + + def test_passthrough_already_gray(self): + img = _make_test_image(color=False) + result = to_grayscale(img) + assert len(result.shape) == 2 + np.testing.assert_array_equal(result, img) + + +# --- denoise --- + +class TestDenoise: + def test_returns_same_shape(self): + img = _make_test_image(color=False) + result = denoise(img) + assert result.shape == img.shape + assert result.dtype == np.uint8 + + def test_custom_params(self): + img = _make_test_image(color=False) + result = denoise(img, d=5, sigma_color=50.0, sigma_space=50.0) + assert result.shape == img.shape + + def test_reduces_noise(self): + img = _make_test_image(color=False).astype(np.float64) + rng = np.random.default_rng(42) + noisy = np.clip(img + rng.normal(0, 25, img.shape), 0, 255).astype(np.uint8) + result = denoise(noisy, d=9, sigma_color=75, sigma_space=75) + # Denoised image should be closer to original than noisy version + orig = _make_test_image(color=False) + noise_diff = np.mean(np.abs(noisy.astype(float) - orig.astype(float))) + clean_diff = np.mean(np.abs(result.astype(float) - orig.astype(float))) + assert clean_diff < noise_diff + + +# --- enhance_contrast --- + +class TestEnhanceContrast: + def test_returns_same_shape(self): + img = _make_test_image(color=False) + result = enhance_contrast(img) + assert result.shape == img.shape + + def test_custom_clip_limit(self): + img = _make_test_image(color=False) + result = enhance_contrast(img, clip_limit=4.0, tile_grid_size=(4, 4)) + assert result.shape == img.shape + + def test_increases_dynamic_range(self): + # Low-contrast gray image + img = np.full((80, 100), 120, dtype=np.uint8) + img[20:60, 20:80] = 130 + result = enhance_contrast(img, clip_limit=2.0) + # CLAHE should increase the spread + assert result.std() >= img.std() + + +# --- threshold --- + +class TestThreshold: + def test_otsu_produces_binary(self): + img = _make_test_image(color=False) + result = threshold(img) + unique = set(np.unique(result)) + assert unique <= {0, 255} + + def test_manual_threshold(self): + img = _make_test_image(color=False) + result = threshold(img, manual_thresh=100) + unique = set(np.unique(result)) + assert unique <= {0, 255} + + def test_manual_threshold_value(self): + img = np.array([[50, 150], [100, 200]], dtype=np.uint8) + result = threshold(img, manual_thresh=120) + expected = np.array([[0, 255], [0, 255]], dtype=np.uint8) + np.testing.assert_array_equal(result, expected) + + +# --- edge_detect --- + +class TestEdgeDetect: + def test_returns_same_shape(self): + img = _make_test_image(color=False) + result = edge_detect(img) + assert result.shape == img.shape + + def test_detects_edges(self): + img = np.zeros((100, 100), dtype=np.uint8) + img[25:75, 25:75] = 255 + result = edge_detect(img, low=50, high=150) + # Should have non-zero pixels along the rectangle edges + assert np.count_nonzero(result) > 0 + # Interior and exterior should be zero + assert result[50, 50] == 0 + assert result[0, 0] == 0 + + def test_custom_thresholds(self): + img = _make_test_image(color=False) + result = edge_detect(img, low=100, high=200) + assert result.dtype == np.uint8 + + +# --- morphological_ops --- + +class TestMorphologicalOps: + def test_returns_same_shape(self): + img = threshold(_make_test_image(color=False)) + result = morphological_ops(img) + assert result.shape == img.shape + + def test_fills_small_gaps(self): + # Create binary image with a small hole + img = np.zeros((100, 100), dtype=np.uint8) + img[20:80, 20:80] = 255 + img[49:51, 49:51] = 0 # small gap + result = morphological_ops(img, kernel_size=3, dilate_iterations=1, erode_iterations=1) + # Gap should be filled after dilation + assert result[49, 49] == 255 + + def test_custom_kernel_and_iterations(self): + img = threshold(_make_test_image(color=False)) + result = morphological_ops(img, kernel_size=5, dilate_iterations=2, erode_iterations=2) + assert result.shape == img.shape + + +# --- full pipeline --- + +class TestPreprocess: + def test_end_to_end_default_params(self): + img = _make_test_image() + raw = _encode_png(img) + result = preprocess(raw) + assert len(result.shape) == 2 + assert result.dtype == np.uint8 + unique = set(np.unique(result)) + # After threshold + morph, should be mostly binary + assert unique <= {0, 255} + + def test_with_edge_detection(self): + img = _make_test_image() + raw = _encode_png(img) + result = preprocess(raw, params={"edge_detect": True}) + assert len(result.shape) == 2 + + def test_custom_params(self): + img = _make_test_image() + raw = _encode_png(img) + result = preprocess(raw, params={ + "denoise_d": 5, + "denoise_sigma_color": 50.0, + "clahe_clip_limit": 4.0, + "threshold_manual": 128, + "morph_kernel_size": 5, + "morph_dilate_iterations": 2, + "morph_erode_iterations": 2, + }) + assert result.dtype == np.uint8 + assert len(result.shape) == 2 + + def test_rejects_bad_input(self): + with pytest.raises(ValueError): + preprocess(b"garbage data")