From 6d51628ce8d7cf66ae6ea166c07536140ef89384 Mon Sep 17 00:00:00 2001 From: jlightner Date: Thu, 26 Mar 2026 04:32:31 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20Implemented=20RDP=20path=20simplificatio?= =?UTF-8?q?n,=20island/hole=20detection=20via=20win=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - engine/pipeline/postprocess.py - engine/tests/test_postprocess.py GSD-Task: S02/T01 --- engine/pipeline/postprocess.py | 414 +++++++++++++++++++++++++++++++ engine/tests/test_postprocess.py | 375 ++++++++++++++++++++++++++++ 2 files changed, 789 insertions(+) create mode 100644 engine/pipeline/postprocess.py create mode 100644 engine/tests/test_postprocess.py diff --git a/engine/pipeline/postprocess.py b/engine/pipeline/postprocess.py new file mode 100644 index 0000000..25a3e87 --- /dev/null +++ b/engine/pipeline/postprocess.py @@ -0,0 +1,414 @@ +"""Post-processing pipeline — RDP simplification, island detection, open path repair.""" + +from __future__ import annotations + +import math +import re +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field + + +@dataclass +class PathInfo: + """Metadata and geometry for a single SVG path after post-processing.""" + + original_coords: list[tuple[float, float]] + simplified_coords: list[tuple[float, float]] + is_closed: bool + is_island: bool + node_count: int + original_node_count: int + area: float # signed — negative means clockwise (island/hole) + + +@dataclass +class PostProcessResult: + """Aggregated result of running post-processing on an SVG.""" + + paths: list[PathInfo] + svg: str + total_nodes: int + total_original_nodes: int + open_path_count: int + island_count: int + + +# --------------------------------------------------------------------------- +# SVG path parsing +# --------------------------------------------------------------------------- + +_CMD_RE = re.compile(r"([MmLlHhVvCcSsQqTtAaZz])") +_NUM_RE = re.compile(r"[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?") + + +def parse_svg_path(d: str) -> list[list[tuple[float, float]]]: + """Parse an SVG path `d` attribute into a list of subpaths. + + Each subpath is a list of (x, y) coordinate tuples extracted from + M/L/C/Q/Z commands. Curves are sampled at their control points and + endpoints (not flattened), which is sufficient for RDP simplification + and geometric analysis. + + Returns: + List of subpaths, each a list of (x, y) tuples. + """ + tokens = _CMD_RE.split(d) + subpaths: list[list[tuple[float, float]]] = [] + current: list[tuple[float, float]] = [] + cx, cy = 0.0, 0.0 + + subpath_start: tuple[float, float] | None = None + + i = 0 + while i < len(tokens): + token = tokens[i].strip() + if not token: + i += 1 + continue + + if token in ("Z", "z"): + # Close the subpath by appending start point if not already there + if current and subpath_start is not None: + if current[-1] != subpath_start: + current.append(subpath_start) + subpaths.append(current) + current = [] + i += 1 + continue + + if len(token) == 1 and token.isalpha(): + cmd = token + i += 1 + if i < len(tokens): + nums = [float(n) for n in _NUM_RE.findall(tokens[i])] + else: + nums = [] + i += 1 + else: + i += 1 + continue + + if cmd == "M": + if current: + subpaths.append(current) + current = [] + j = 0 + while j + 1 < len(nums): + cx, cy = nums[j], nums[j + 1] + current.append((cx, cy)) + if j == 0: + subpath_start = (cx, cy) + j += 2 + elif cmd == "m": + if current: + subpaths.append(current) + current = [] + j = 0 + while j + 1 < len(nums): + cx += nums[j] + cy += nums[j + 1] + current.append((cx, cy)) + if j == 0: + subpath_start = (cx, cy) + j += 2 + elif cmd == "L": + j = 0 + while j + 1 < len(nums): + cx, cy = nums[j], nums[j + 1] + current.append((cx, cy)) + j += 2 + elif cmd == "l": + j = 0 + while j + 1 < len(nums): + cx += nums[j] + cy += nums[j + 1] + current.append((cx, cy)) + j += 2 + elif cmd == "H": + for n in nums: + cx = n + current.append((cx, cy)) + elif cmd == "h": + for n in nums: + cx += n + current.append((cx, cy)) + elif cmd == "V": + for n in nums: + cy = n + current.append((cx, cy)) + elif cmd == "v": + for n in nums: + cy += n + current.append((cx, cy)) + elif cmd == "C": + j = 0 + while j + 5 < len(nums): + # c1, c2, endpoint — keep endpoint for simplification + cx, cy = nums[j + 4], nums[j + 5] + current.append((nums[j], nums[j + 1])) + current.append((nums[j + 2], nums[j + 3])) + current.append((cx, cy)) + j += 6 + elif cmd == "c": + j = 0 + while j + 5 < len(nums): + current.append((cx + nums[j], cy + nums[j + 1])) + current.append((cx + nums[j + 2], cy + nums[j + 3])) + cx += nums[j + 4] + cy += nums[j + 5] + current.append((cx, cy)) + j += 6 + elif cmd == "Q": + j = 0 + while j + 3 < len(nums): + current.append((nums[j], nums[j + 1])) + cx, cy = nums[j + 2], nums[j + 3] + current.append((cx, cy)) + j += 4 + elif cmd == "q": + j = 0 + while j + 3 < len(nums): + current.append((cx + nums[j], cy + nums[j + 1])) + cx += nums[j + 2] + cy += nums[j + 3] + current.append((cx, cy)) + j += 4 + elif cmd in ("S", "s", "T", "t", "A", "a"): + # Simplified handling — just track endpoint + if nums: + if cmd.isupper(): + cx, cy = nums[-2], nums[-1] + else: + cx += nums[-2] + cy += nums[-1] + current.append((cx, cy)) + + if current: + subpaths.append(current) + + return subpaths + + +# --------------------------------------------------------------------------- +# RDP simplification +# --------------------------------------------------------------------------- + + +def _perpendicular_distance( + point: tuple[float, float], + line_start: tuple[float, float], + line_end: tuple[float, float], +) -> float: + """Perpendicular distance from a point to a line segment.""" + dx = line_end[0] - line_start[0] + dy = line_end[1] - line_start[1] + length_sq = dx * dx + dy * dy + if length_sq == 0: + return math.hypot(point[0] - line_start[0], point[1] - line_start[1]) + num = abs(dy * point[0] - dx * point[1] + line_end[0] * line_start[1] - line_end[1] * line_start[0]) + return num / math.sqrt(length_sq) + + +def rdp_simplify( + coords: list[tuple[float, float]], epsilon: float = 1.0 +) -> list[tuple[float, float]]: + """Apply Ramer-Douglas-Peucker simplification to a coordinate list. + + Args: + coords: List of (x, y) tuples. + epsilon: Distance threshold — higher values produce simpler paths. + + Returns: + Simplified list of (x, y) tuples. + """ + if len(coords) <= 2: + return list(coords) + + # Find the point with the maximum distance from the line between first and last + max_dist = 0.0 + max_idx = 0 + for i in range(1, len(coords) - 1): + dist = _perpendicular_distance(coords[i], coords[0], coords[-1]) + if dist > max_dist: + max_dist = dist + max_idx = i + + if max_dist > epsilon: + left = rdp_simplify(coords[: max_idx + 1], epsilon) + right = rdp_simplify(coords[max_idx:], epsilon) + return left[:-1] + right + else: + return [coords[0], coords[-1]] + + +# --------------------------------------------------------------------------- +# Geometric analysis +# --------------------------------------------------------------------------- + + +def signed_area(coords: list[tuple[float, float]]) -> float: + """Compute the signed area of a polygon using the shoelace formula. + + Positive = counter-clockwise (outer contour in SVG convention). + Negative = clockwise (island / hole). + """ + n = len(coords) + if n < 3: + return 0.0 + area = 0.0 + for i in range(n): + j = (i + 1) % n + area += coords[i][0] * coords[j][1] + area -= coords[j][0] * coords[i][1] + return area / 2.0 + + +def is_closed(coords: list[tuple[float, float]], tolerance: float = 1.0) -> bool: + """Check if a path's start and end points are within tolerance distance.""" + if len(coords) < 2: + return False + return math.hypot( + coords[-1][0] - coords[0][0], coords[-1][1] - coords[0][1] + ) <= tolerance + + +def close_path(coords: list[tuple[float, float]]) -> list[tuple[float, float]]: + """Close an open path by appending the start point.""" + if not coords: + return coords + if coords[-1] != coords[0]: + return coords + [coords[0]] + return list(coords) + + +def detect_island(coords: list[tuple[float, float]]) -> bool: + """Detect if a closed path is an island (hole) based on winding direction. + + In SVG convention with fill-rule="evenodd", clockwise paths (negative + signed area) represent holes/islands inside counter-clockwise outer contours. + """ + return signed_area(coords) < 0 + + +# --------------------------------------------------------------------------- +# Node counting +# --------------------------------------------------------------------------- + + +def node_count(coords: list[tuple[float, float]]) -> int: + """Return the number of unique nodes in a path.""" + return len(coords) + + +# --------------------------------------------------------------------------- +# Full post-processing pipeline +# --------------------------------------------------------------------------- + + +def postprocess_svg( + svg_str: str, + epsilon: float = 1.0, + close_tolerance: float = 1.0, + auto_close: bool = False, +) -> PostProcessResult: + """Run the full post-processing pipeline on an SVG string. + + 1. Parse SVG path data + 2. Apply RDP simplification with given epsilon + 3. Detect islands (clockwise winding) + 4. Detect and optionally repair open paths + 5. Count nodes per path + + Args: + svg_str: Input SVG string. + epsilon: RDP simplification tolerance. + close_tolerance: Distance threshold for considering a path closed. + auto_close: If True, append start point to open paths. + + Returns: + PostProcessResult with per-path metadata and rebuilt SVG. + """ + root = ET.fromstring(svg_str) + ns = {"svg": "http://www.w3.org/2000/svg"} + + path_infos: list[PathInfo] = [] + + for path_el in root.findall("svg:path", ns) or root.findall("path"): + d = path_el.get("d", "") + if not d.strip(): + continue + + subpaths = parse_svg_path(d) + for coords in subpaths: + if len(coords) < 2: + continue + + original_count = node_count(coords) + simplified = rdp_simplify(coords, epsilon) + closed = is_closed(simplified, close_tolerance) + + if auto_close and not closed: + simplified = close_path(simplified) + closed = True + + island = detect_island(simplified) if closed and len(simplified) >= 3 else False + area = signed_area(simplified) + + path_infos.append( + PathInfo( + original_coords=coords, + simplified_coords=simplified, + is_closed=closed, + is_island=island, + node_count=node_count(simplified), + original_node_count=original_count, + area=area, + ) + ) + + # Rebuild SVG with simplified paths + rebuilt_svg = _rebuild_svg(root, path_infos) + + total_nodes = sum(p.node_count for p in path_infos) + total_original = sum(p.original_node_count for p in path_infos) + open_count = sum(1 for p in path_infos if not p.is_closed) + island_count = sum(1 for p in path_infos if p.is_island) + + return PostProcessResult( + paths=path_infos, + svg=rebuilt_svg, + total_nodes=total_nodes, + total_original_nodes=total_original, + open_path_count=open_count, + island_count=island_count, + ) + + +def _rebuild_svg(root: ET.Element, path_infos: list[PathInfo]) -> str: + """Rebuild SVG string from post-processed path data.""" + width = root.get("width", "100") + height = root.get("height", "100") + viewbox = root.get("viewBox", f"0 0 {width} {height}") + + path_parts = [] + for info in path_infos: + if len(info.simplified_coords) < 2: + continue + d_parts = [] + x0, y0 = info.simplified_coords[0] + d_parts.append(f"M {x0:.3f},{y0:.3f}") + for x, y in info.simplified_coords[1:]: + d_parts.append(f"L {x:.3f},{y:.3f}") + if info.is_closed: + d_parts.append("Z") + path_parts.append(" ".join(d_parts)) + + d = " ".join(path_parts) + + return ( + f'' + f'' + f"" + ) diff --git a/engine/tests/test_postprocess.py b/engine/tests/test_postprocess.py new file mode 100644 index 0000000..9a7c3ea --- /dev/null +++ b/engine/tests/test_postprocess.py @@ -0,0 +1,375 @@ +"""Tests for the post-processing pipeline (RDP, island detection, open path repair).""" + +import math + +import pytest + +from pipeline.postprocess import ( + PostProcessResult, + close_path, + detect_island, + is_closed, + node_count, + parse_svg_path, + postprocess_svg, + rdp_simplify, + signed_area, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_svg(d: str, width: int = 100, height: int = 100) -> str: + """Build a minimal SVG string with the given path data.""" + return ( + f'' + f'' + f"" + ) + + +# A simple closed square: 0,0 → 100,0 → 100,100 → 0,100 → close +SQUARE_D = "M 0,0 L 100,0 L 100,100 L 0,100 Z" + +# A triangle +TRIANGLE_D = "M 50,0 L 100,100 L 0,100 Z" + +# An open path (no Z, endpoints differ) +OPEN_D = "M 0,0 L 50,50 L 100,0" + +# Clockwise square (island/hole) — opposite winding from SQUARE_D +CW_SQUARE_D = "M 0,0 L 0,100 L 100,100 L 100,0 Z" + + +# --------------------------------------------------------------------------- +# SVG path parsing +# --------------------------------------------------------------------------- + +class TestParseSvgPath: + def test_simple_move_and_lines(self): + subpaths = parse_svg_path("M 0,0 L 10,0 L 10,10 Z") + assert len(subpaths) == 1 + assert subpaths[0][0] == (0.0, 0.0) + assert subpaths[0][1] == (10.0, 0.0) + assert subpaths[0][2] == (10.0, 10.0) + + def test_multiple_subpaths(self): + subpaths = parse_svg_path("M 0,0 L 10,10 Z M 20,20 L 30,30 Z") + assert len(subpaths) == 2 + + def test_cubic_bezier(self): + subpaths = parse_svg_path("M 0,0 C 10,20 30,40 50,60 Z") + assert len(subpaths) == 1 + coords = subpaths[0] + assert len(coords) >= 2 + # Endpoint (50, 60) should be present; last point is (0,0) from Z close + assert (50.0, 60.0) in coords + assert coords[-1] == (0.0, 0.0) # Z closes back to start + + def test_relative_lineto(self): + subpaths = parse_svg_path("M 10,10 l 5,0 l 0,5 Z") + assert len(subpaths) == 1 + assert subpaths[0][0] == (10.0, 10.0) + assert subpaths[0][1] == (15.0, 10.0) + assert subpaths[0][2] == (15.0, 15.0) + + def test_horizontal_vertical(self): + subpaths = parse_svg_path("M 0,0 H 10 V 10 Z") + assert len(subpaths) == 1 + assert (10.0, 0.0) in subpaths[0] + assert (10.0, 10.0) in subpaths[0] + + def test_empty_path(self): + subpaths = parse_svg_path("") + assert subpaths == [] + + def test_move_only(self): + subpaths = parse_svg_path("M 5,5") + assert len(subpaths) == 1 + assert subpaths[0] == [(5.0, 5.0)] + + def test_quadratic_bezier(self): + subpaths = parse_svg_path("M 0,0 Q 50,100 100,0 Z") + assert len(subpaths) == 1 + coords = subpaths[0] + assert (100.0, 0.0) in coords + assert coords[-1] == (0.0, 0.0) # Z closes back to start + + +# --------------------------------------------------------------------------- +# RDP simplification +# --------------------------------------------------------------------------- + +class TestRdpSimplify: + def test_collinear_points_reduced(self): + """Points along a straight line should be reduced to just endpoints.""" + coords = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)] + result = rdp_simplify(coords, epsilon=0.1) + assert len(result) == 2 + assert result[0] == (0, 0) + assert result[-1] == (4, 4) + + def test_preserves_corners(self): + """A right angle should be preserved even with simplification.""" + coords = [(0, 0), (10, 0), (10, 10)] + result = rdp_simplify(coords, epsilon=0.5) + assert len(result) == 3 + + def test_epsilon_zero_preserves_all(self): + """Epsilon=0 should keep all points.""" + coords = [(0, 0), (5, 1), (10, 0)] + result = rdp_simplify(coords, epsilon=0.0) + assert len(result) == 3 + + def test_high_epsilon_aggressive(self): + """High epsilon should aggressively simplify.""" + coords = [(0, 0), (5, 0.5), (10, 0), (15, 0.3), (20, 0)] + result = rdp_simplify(coords, epsilon=10.0) + assert len(result) == 2 + + def test_two_points_unchanged(self): + coords = [(0, 0), (10, 10)] + result = rdp_simplify(coords, epsilon=1.0) + assert result == [(0, 0), (10, 10)] + + def test_single_point_unchanged(self): + coords = [(5, 5)] + result = rdp_simplify(coords, epsilon=1.0) + assert result == [(5, 5)] + + def test_empty_input(self): + result = rdp_simplify([], epsilon=1.0) + assert result == [] + + def test_reduces_node_count(self): + """A complex path should have fewer nodes after simplification.""" + # Approximate a circle with many points + n = 100 + coords = [ + (50 + 40 * math.cos(2 * math.pi * i / n), + 50 + 40 * math.sin(2 * math.pi * i / n)) + for i in range(n) + ] + result = rdp_simplify(coords, epsilon=2.0) + assert len(result) < len(coords) + assert len(result) >= 3 # must retain at least a polygon + + +# --------------------------------------------------------------------------- +# Signed area / winding detection +# --------------------------------------------------------------------------- + +class TestSignedArea: + def test_ccw_square_positive(self): + """Counter-clockwise square should have positive area.""" + coords = [(0, 0), (100, 0), (100, 100), (0, 100)] + assert signed_area(coords) > 0 + + def test_cw_square_negative(self): + """Clockwise square should have negative area.""" + coords = [(0, 0), (0, 100), (100, 100), (100, 0)] + assert signed_area(coords) < 0 + + def test_area_magnitude(self): + """Area of a 10x10 square should be 100.""" + coords = [(0, 0), (10, 0), (10, 10), (0, 10)] + assert abs(signed_area(coords)) == pytest.approx(100.0) + + def test_degenerate_line(self): + """Two points have zero area.""" + assert signed_area([(0, 0), (10, 10)]) == 0.0 + + def test_single_point(self): + assert signed_area([(0, 0)]) == 0.0 + + def test_empty(self): + assert signed_area([]) == 0.0 + + +# --------------------------------------------------------------------------- +# Island detection +# --------------------------------------------------------------------------- + +class TestDetectIsland: + def test_ccw_is_not_island(self): + coords = [(0, 0), (100, 0), (100, 100), (0, 100)] + assert detect_island(coords) is False + + def test_cw_is_island(self): + coords = [(0, 0), (0, 100), (100, 100), (100, 0)] + assert detect_island(coords) is True + + +# --------------------------------------------------------------------------- +# Open path detection + repair +# --------------------------------------------------------------------------- + +class TestIsClosed: + def test_closed_path(self): + coords = [(0, 0), (10, 0), (10, 10), (0, 0)] + assert is_closed(coords) is True + + def test_open_path(self): + coords = [(0, 0), (10, 0), (10, 10)] + assert is_closed(coords) is False + + def test_nearly_closed(self): + """Path within tolerance should count as closed.""" + coords = [(0, 0), (10, 0), (10, 10), (0.5, 0.3)] + assert is_closed(coords, tolerance=1.0) is True + + def test_single_point(self): + assert is_closed([(0, 0)]) is False + + def test_empty(self): + assert is_closed([]) is False + + +class TestClosePath: + def test_closes_open_path(self): + coords = [(0, 0), (10, 0), (10, 10)] + result = close_path(coords) + assert result[-1] == result[0] + assert len(result) == 4 + + def test_already_closed(self): + coords = [(0, 0), (10, 0), (10, 10), (0, 0)] + result = close_path(coords) + assert len(result) == 4 # no duplicate added + + def test_empty(self): + assert close_path([]) == [] + + +# --------------------------------------------------------------------------- +# Node counting +# --------------------------------------------------------------------------- + +class TestNodeCount: + def test_counts_nodes(self): + assert node_count([(0, 0), (1, 1), (2, 2)]) == 3 + + def test_empty(self): + assert node_count([]) == 0 + + +# --------------------------------------------------------------------------- +# Full pipeline integration +# --------------------------------------------------------------------------- + +class TestPostprocessSvg: + def test_returns_result_object(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + assert isinstance(result, PostProcessResult) + + def test_path_count(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + assert len(result.paths) >= 1 + + def test_node_count_reduction(self): + """Simplification should reduce or maintain node count.""" + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg, epsilon=0.5) + for path in result.paths: + assert path.node_count <= path.original_node_count + + def test_total_nodes_tracked(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + assert result.total_nodes == sum(p.node_count for p in result.paths) + + def test_closed_path_detected(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + # Square with Z should be detected as closed + assert any(p.is_closed for p in result.paths) + + def test_open_path_detected(self): + svg = _make_svg(OPEN_D) + result = postprocess_svg(svg) + assert result.open_path_count >= 1 + + def test_auto_close(self): + svg = _make_svg(OPEN_D) + result = postprocess_svg(svg, auto_close=True) + # After auto-close, no open paths should remain + assert result.open_path_count == 0 + + def test_island_detection(self): + # Combine an outer CCW path with an inner CW path + combined_d = f"{SQUARE_D} {CW_SQUARE_D}" + svg = _make_svg(combined_d) + result = postprocess_svg(svg) + assert result.island_count >= 1 + + def test_output_svg_is_well_formed(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + import xml.etree.ElementTree as ET + root = ET.fromstring(result.svg) + assert root.tag == "{http://www.w3.org/2000/svg}svg" + + def test_output_svg_has_path(self): + svg = _make_svg(SQUARE_D) + result = postprocess_svg(svg) + import xml.etree.ElementTree as ET + root = ET.fromstring(result.svg) + ns = {"svg": "http://www.w3.org/2000/svg"} + paths = root.findall("svg:path", ns) + assert len(paths) >= 1 + + def test_epsilon_affects_simplification(self): + """Higher epsilon should produce fewer or equal nodes.""" + # Build a complex path + n = 50 + points = " ".join( + f"L {50 + 40 * math.cos(2 * math.pi * i / n):.3f}," + f"{50 + 40 * math.sin(2 * math.pi * i / n):.3f}" + for i in range(1, n) + ) + x0 = 50 + 40 * math.cos(0) + y0 = 50 + 40 * math.sin(0) + d = f"M {x0:.3f},{y0:.3f} {points} Z" + svg = _make_svg(d) + + result_low = postprocess_svg(svg, epsilon=0.1) + result_high = postprocess_svg(svg, epsilon=10.0) + assert result_high.total_nodes <= result_low.total_nodes + + +class TestPostprocessWithVectorizerOutput: + """Integration test — feed real vectorizer SVG through post-processing.""" + + def test_potrace_output(self): + """Post-process real Potrace output.""" + import numpy as np + from pipeline.vectorize import potrace_trace + + img = np.zeros((100, 100), dtype=np.uint8) + img[20:80, 20:80] = 255 + svg = potrace_trace(img) + + result = postprocess_svg(svg, epsilon=1.0) + assert isinstance(result, PostProcessResult) + assert len(result.paths) >= 1 + assert result.total_nodes > 0 + + def test_vtracer_output(self): + """Post-process real VTracer output.""" + import numpy as np + from pipeline.vectorize import vtracer_trace + + img = np.zeros((100, 100), dtype=np.uint8) + img[20:80, 20:80] = 255 + svg = vtracer_trace(img) + + result = postprocess_svg(svg, epsilon=1.0) + assert isinstance(result, PostProcessResult) + assert len(result.paths) >= 1 + assert result.total_nodes > 0