test: Implemented full OpenCV preprocessing pipeline (grayscale, bilate…
- engine/pipeline/preprocessing.py - engine/tests/test_preprocessing.py GSD-Task: S01/T02
This commit is contained in:
parent
7411bf3ed4
commit
816ba43cd6
2 changed files with 362 additions and 0 deletions
130
engine/pipeline/preprocessing.py
Normal file
130
engine/pipeline/preprocessing.py
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
"""OpenCV preprocessing pipeline for raster-to-vector conversion."""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def decode_image(raw_bytes: bytes) -> np.ndarray:
|
||||
"""Decode raw image bytes into a BGR numpy array."""
|
||||
buf = np.frombuffer(raw_bytes, dtype=np.uint8)
|
||||
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
|
||||
if img is None:
|
||||
raise ValueError("Failed to decode image from provided bytes")
|
||||
return img
|
||||
|
||||
|
||||
def to_grayscale(img: np.ndarray) -> np.ndarray:
|
||||
"""Convert BGR image to single-channel grayscale."""
|
||||
if len(img.shape) == 2:
|
||||
return img
|
||||
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
|
||||
def denoise(
|
||||
img: np.ndarray,
|
||||
d: int = 9,
|
||||
sigma_color: float = 75.0,
|
||||
sigma_space: float = 75.0,
|
||||
) -> np.ndarray:
|
||||
"""Apply bilateral filter for edge-preserving denoising."""
|
||||
return cv2.bilateralFilter(img, d, sigma_color, sigma_space)
|
||||
|
||||
|
||||
def enhance_contrast(
|
||||
img: np.ndarray,
|
||||
clip_limit: float = 2.0,
|
||||
tile_grid_size: tuple[int, int] = (8, 8),
|
||||
) -> np.ndarray:
|
||||
"""Apply CLAHE contrast enhancement."""
|
||||
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size)
|
||||
return clahe.apply(img)
|
||||
|
||||
|
||||
def threshold(
|
||||
img: np.ndarray,
|
||||
manual_thresh: int | None = None,
|
||||
) -> np.ndarray:
|
||||
"""Apply thresholding — Otsu auto by default, manual override if provided."""
|
||||
if manual_thresh is not None:
|
||||
_, result = cv2.threshold(img, manual_thresh, 255, cv2.THRESH_BINARY)
|
||||
else:
|
||||
_, result = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||||
return result
|
||||
|
||||
|
||||
def edge_detect(
|
||||
img: np.ndarray,
|
||||
low: int = 50,
|
||||
high: int = 150,
|
||||
) -> np.ndarray:
|
||||
"""Apply Canny edge detection."""
|
||||
return cv2.Canny(img, low, high)
|
||||
|
||||
|
||||
def morphological_ops(
|
||||
img: np.ndarray,
|
||||
kernel_size: int = 3,
|
||||
dilate_iterations: int = 1,
|
||||
erode_iterations: int = 1,
|
||||
) -> np.ndarray:
|
||||
"""Apply dilation then erosion (closing-style) to clean up binary image."""
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
|
||||
result = cv2.dilate(img, kernel, iterations=dilate_iterations)
|
||||
result = cv2.erode(result, kernel, iterations=erode_iterations)
|
||||
return result
|
||||
|
||||
|
||||
def preprocess(
|
||||
raw_bytes: bytes,
|
||||
params: dict | None = None,
|
||||
) -> np.ndarray:
|
||||
"""Run the full preprocessing pipeline on raw image bytes.
|
||||
|
||||
Stages: decode → grayscale → denoise → contrast → threshold → morphological ops.
|
||||
Edge detection is optional (enabled via params["edge_detect"] = True).
|
||||
|
||||
All stage parameters are tunable via the params dict. Keys:
|
||||
denoise_d, denoise_sigma_color, denoise_sigma_space,
|
||||
clahe_clip_limit, clahe_tile_grid_size,
|
||||
threshold_manual,
|
||||
edge_detect (bool), edge_low, edge_high,
|
||||
morph_kernel_size, morph_dilate_iterations, morph_erode_iterations
|
||||
"""
|
||||
p = params or {}
|
||||
|
||||
img = decode_image(raw_bytes)
|
||||
img = to_grayscale(img)
|
||||
|
||||
img = denoise(
|
||||
img,
|
||||
d=p.get("denoise_d", 9),
|
||||
sigma_color=p.get("denoise_sigma_color", 75.0),
|
||||
sigma_space=p.get("denoise_sigma_space", 75.0),
|
||||
)
|
||||
|
||||
img = enhance_contrast(
|
||||
img,
|
||||
clip_limit=p.get("clahe_clip_limit", 2.0),
|
||||
tile_grid_size=p.get("clahe_tile_grid_size", (8, 8)),
|
||||
)
|
||||
|
||||
img = threshold(
|
||||
img,
|
||||
manual_thresh=p.get("threshold_manual"),
|
||||
)
|
||||
|
||||
if p.get("edge_detect", False):
|
||||
img = edge_detect(
|
||||
img,
|
||||
low=p.get("edge_low", 50),
|
||||
high=p.get("edge_high", 150),
|
||||
)
|
||||
|
||||
img = morphological_ops(
|
||||
img,
|
||||
kernel_size=p.get("morph_kernel_size", 3),
|
||||
dilate_iterations=p.get("morph_dilate_iterations", 1),
|
||||
erode_iterations=p.get("morph_erode_iterations", 1),
|
||||
)
|
||||
|
||||
return img
|
||||
232
engine/tests/test_preprocessing.py
Normal file
232
engine/tests/test_preprocessing.py
Normal file
|
|
@ -0,0 +1,232 @@
|
|||
"""Tests for the OpenCV preprocessing pipeline."""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from pipeline.preprocessing import (
|
||||
decode_image,
|
||||
denoise,
|
||||
edge_detect,
|
||||
enhance_contrast,
|
||||
morphological_ops,
|
||||
preprocess,
|
||||
threshold,
|
||||
to_grayscale,
|
||||
)
|
||||
|
||||
|
||||
def _make_test_image(width: int = 100, height: int = 80, color: bool = True) -> np.ndarray:
|
||||
"""Create a synthetic test image with some structure."""
|
||||
if color:
|
||||
img = np.zeros((height, width, 3), dtype=np.uint8)
|
||||
# White rectangle on black background
|
||||
cv2.rectangle(img, (20, 15), (80, 65), (255, 255, 255), -1)
|
||||
# Gray circle for tonal variation
|
||||
cv2.circle(img, (50, 40), 15, (128, 128, 128), -1)
|
||||
else:
|
||||
img = np.zeros((height, width), dtype=np.uint8)
|
||||
cv2.rectangle(img, (20, 15), (80, 65), 255, -1)
|
||||
cv2.circle(img, (50, 40), 15, 128, -1)
|
||||
return img
|
||||
|
||||
|
||||
def _encode_png(img: np.ndarray) -> bytes:
|
||||
"""Encode numpy array to PNG bytes."""
|
||||
ok, buf = cv2.imencode(".png", img)
|
||||
assert ok
|
||||
return buf.tobytes()
|
||||
|
||||
|
||||
# --- decode_image ---
|
||||
|
||||
class TestDecodeImage:
|
||||
def test_decodes_valid_png(self):
|
||||
img = _make_test_image()
|
||||
raw = _encode_png(img)
|
||||
result = decode_image(raw)
|
||||
assert result.shape == img.shape
|
||||
assert result.dtype == np.uint8
|
||||
|
||||
def test_rejects_invalid_bytes(self):
|
||||
with pytest.raises(ValueError, match="Failed to decode"):
|
||||
decode_image(b"not an image")
|
||||
|
||||
def test_decodes_jpeg(self):
|
||||
img = _make_test_image()
|
||||
ok, buf = cv2.imencode(".jpg", img)
|
||||
assert ok
|
||||
result = decode_image(buf.tobytes())
|
||||
assert result.shape == img.shape
|
||||
|
||||
|
||||
# --- to_grayscale ---
|
||||
|
||||
class TestToGrayscale:
|
||||
def test_converts_color_to_gray(self):
|
||||
img = _make_test_image(color=True)
|
||||
result = to_grayscale(img)
|
||||
assert len(result.shape) == 2
|
||||
assert result.shape == (80, 100)
|
||||
|
||||
def test_passthrough_already_gray(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = to_grayscale(img)
|
||||
assert len(result.shape) == 2
|
||||
np.testing.assert_array_equal(result, img)
|
||||
|
||||
|
||||
# --- denoise ---
|
||||
|
||||
class TestDenoise:
|
||||
def test_returns_same_shape(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = denoise(img)
|
||||
assert result.shape == img.shape
|
||||
assert result.dtype == np.uint8
|
||||
|
||||
def test_custom_params(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = denoise(img, d=5, sigma_color=50.0, sigma_space=50.0)
|
||||
assert result.shape == img.shape
|
||||
|
||||
def test_reduces_noise(self):
|
||||
img = _make_test_image(color=False).astype(np.float64)
|
||||
rng = np.random.default_rng(42)
|
||||
noisy = np.clip(img + rng.normal(0, 25, img.shape), 0, 255).astype(np.uint8)
|
||||
result = denoise(noisy, d=9, sigma_color=75, sigma_space=75)
|
||||
# Denoised image should be closer to original than noisy version
|
||||
orig = _make_test_image(color=False)
|
||||
noise_diff = np.mean(np.abs(noisy.astype(float) - orig.astype(float)))
|
||||
clean_diff = np.mean(np.abs(result.astype(float) - orig.astype(float)))
|
||||
assert clean_diff < noise_diff
|
||||
|
||||
|
||||
# --- enhance_contrast ---
|
||||
|
||||
class TestEnhanceContrast:
|
||||
def test_returns_same_shape(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = enhance_contrast(img)
|
||||
assert result.shape == img.shape
|
||||
|
||||
def test_custom_clip_limit(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = enhance_contrast(img, clip_limit=4.0, tile_grid_size=(4, 4))
|
||||
assert result.shape == img.shape
|
||||
|
||||
def test_increases_dynamic_range(self):
|
||||
# Low-contrast gray image
|
||||
img = np.full((80, 100), 120, dtype=np.uint8)
|
||||
img[20:60, 20:80] = 130
|
||||
result = enhance_contrast(img, clip_limit=2.0)
|
||||
# CLAHE should increase the spread
|
||||
assert result.std() >= img.std()
|
||||
|
||||
|
||||
# --- threshold ---
|
||||
|
||||
class TestThreshold:
|
||||
def test_otsu_produces_binary(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = threshold(img)
|
||||
unique = set(np.unique(result))
|
||||
assert unique <= {0, 255}
|
||||
|
||||
def test_manual_threshold(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = threshold(img, manual_thresh=100)
|
||||
unique = set(np.unique(result))
|
||||
assert unique <= {0, 255}
|
||||
|
||||
def test_manual_threshold_value(self):
|
||||
img = np.array([[50, 150], [100, 200]], dtype=np.uint8)
|
||||
result = threshold(img, manual_thresh=120)
|
||||
expected = np.array([[0, 255], [0, 255]], dtype=np.uint8)
|
||||
np.testing.assert_array_equal(result, expected)
|
||||
|
||||
|
||||
# --- edge_detect ---
|
||||
|
||||
class TestEdgeDetect:
|
||||
def test_returns_same_shape(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = edge_detect(img)
|
||||
assert result.shape == img.shape
|
||||
|
||||
def test_detects_edges(self):
|
||||
img = np.zeros((100, 100), dtype=np.uint8)
|
||||
img[25:75, 25:75] = 255
|
||||
result = edge_detect(img, low=50, high=150)
|
||||
# Should have non-zero pixels along the rectangle edges
|
||||
assert np.count_nonzero(result) > 0
|
||||
# Interior and exterior should be zero
|
||||
assert result[50, 50] == 0
|
||||
assert result[0, 0] == 0
|
||||
|
||||
def test_custom_thresholds(self):
|
||||
img = _make_test_image(color=False)
|
||||
result = edge_detect(img, low=100, high=200)
|
||||
assert result.dtype == np.uint8
|
||||
|
||||
|
||||
# --- morphological_ops ---
|
||||
|
||||
class TestMorphologicalOps:
|
||||
def test_returns_same_shape(self):
|
||||
img = threshold(_make_test_image(color=False))
|
||||
result = morphological_ops(img)
|
||||
assert result.shape == img.shape
|
||||
|
||||
def test_fills_small_gaps(self):
|
||||
# Create binary image with a small hole
|
||||
img = np.zeros((100, 100), dtype=np.uint8)
|
||||
img[20:80, 20:80] = 255
|
||||
img[49:51, 49:51] = 0 # small gap
|
||||
result = morphological_ops(img, kernel_size=3, dilate_iterations=1, erode_iterations=1)
|
||||
# Gap should be filled after dilation
|
||||
assert result[49, 49] == 255
|
||||
|
||||
def test_custom_kernel_and_iterations(self):
|
||||
img = threshold(_make_test_image(color=False))
|
||||
result = morphological_ops(img, kernel_size=5, dilate_iterations=2, erode_iterations=2)
|
||||
assert result.shape == img.shape
|
||||
|
||||
|
||||
# --- full pipeline ---
|
||||
|
||||
class TestPreprocess:
|
||||
def test_end_to_end_default_params(self):
|
||||
img = _make_test_image()
|
||||
raw = _encode_png(img)
|
||||
result = preprocess(raw)
|
||||
assert len(result.shape) == 2
|
||||
assert result.dtype == np.uint8
|
||||
unique = set(np.unique(result))
|
||||
# After threshold + morph, should be mostly binary
|
||||
assert unique <= {0, 255}
|
||||
|
||||
def test_with_edge_detection(self):
|
||||
img = _make_test_image()
|
||||
raw = _encode_png(img)
|
||||
result = preprocess(raw, params={"edge_detect": True})
|
||||
assert len(result.shape) == 2
|
||||
|
||||
def test_custom_params(self):
|
||||
img = _make_test_image()
|
||||
raw = _encode_png(img)
|
||||
result = preprocess(raw, params={
|
||||
"denoise_d": 5,
|
||||
"denoise_sigma_color": 50.0,
|
||||
"clahe_clip_limit": 4.0,
|
||||
"threshold_manual": 128,
|
||||
"morph_kernel_size": 5,
|
||||
"morph_dilate_iterations": 2,
|
||||
"morph_erode_iterations": 2,
|
||||
})
|
||||
assert result.dtype == np.uint8
|
||||
assert len(result.shape) == 2
|
||||
|
||||
def test_rejects_bad_input(self):
|
||||
with pytest.raises(ValueError):
|
||||
preprocess(b"garbage data")
|
||||
Loading…
Add table
Reference in a new issue