fix: Implemented RDP path simplification, island/hole detection via win…

- engine/pipeline/postprocess.py
- engine/tests/test_postprocess.py

GSD-Task: S02/T01
This commit is contained in:
jlightner 2026-03-26 04:32:31 +00:00
parent a91c99dd6c
commit 6d51628ce8
2 changed files with 789 additions and 0 deletions

View file

@ -0,0 +1,414 @@
"""Post-processing pipeline — RDP simplification, island detection, open path repair."""
from __future__ import annotations
import math
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
@dataclass
class PathInfo:
"""Metadata and geometry for a single SVG path after post-processing."""
original_coords: list[tuple[float, float]]
simplified_coords: list[tuple[float, float]]
is_closed: bool
is_island: bool
node_count: int
original_node_count: int
area: float # signed — negative means clockwise (island/hole)
@dataclass
class PostProcessResult:
"""Aggregated result of running post-processing on an SVG."""
paths: list[PathInfo]
svg: str
total_nodes: int
total_original_nodes: int
open_path_count: int
island_count: int
# ---------------------------------------------------------------------------
# SVG path parsing
# ---------------------------------------------------------------------------
_CMD_RE = re.compile(r"([MmLlHhVvCcSsQqTtAaZz])")
_NUM_RE = re.compile(r"[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
def parse_svg_path(d: str) -> list[list[tuple[float, float]]]:
"""Parse an SVG path `d` attribute into a list of subpaths.
Each subpath is a list of (x, y) coordinate tuples extracted from
M/L/C/Q/Z commands. Curves are sampled at their control points and
endpoints (not flattened), which is sufficient for RDP simplification
and geometric analysis.
Returns:
List of subpaths, each a list of (x, y) tuples.
"""
tokens = _CMD_RE.split(d)
subpaths: list[list[tuple[float, float]]] = []
current: list[tuple[float, float]] = []
cx, cy = 0.0, 0.0
subpath_start: tuple[float, float] | None = None
i = 0
while i < len(tokens):
token = tokens[i].strip()
if not token:
i += 1
continue
if token in ("Z", "z"):
# Close the subpath by appending start point if not already there
if current and subpath_start is not None:
if current[-1] != subpath_start:
current.append(subpath_start)
subpaths.append(current)
current = []
i += 1
continue
if len(token) == 1 and token.isalpha():
cmd = token
i += 1
if i < len(tokens):
nums = [float(n) for n in _NUM_RE.findall(tokens[i])]
else:
nums = []
i += 1
else:
i += 1
continue
if cmd == "M":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "m":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "L":
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "l":
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "H":
for n in nums:
cx = n
current.append((cx, cy))
elif cmd == "h":
for n in nums:
cx += n
current.append((cx, cy))
elif cmd == "V":
for n in nums:
cy = n
current.append((cx, cy))
elif cmd == "v":
for n in nums:
cy += n
current.append((cx, cy))
elif cmd == "C":
j = 0
while j + 5 < len(nums):
# c1, c2, endpoint — keep endpoint for simplification
cx, cy = nums[j + 4], nums[j + 5]
current.append((nums[j], nums[j + 1]))
current.append((nums[j + 2], nums[j + 3]))
current.append((cx, cy))
j += 6
elif cmd == "c":
j = 0
while j + 5 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
current.append((cx + nums[j + 2], cy + nums[j + 3]))
cx += nums[j + 4]
cy += nums[j + 5]
current.append((cx, cy))
j += 6
elif cmd == "Q":
j = 0
while j + 3 < len(nums):
current.append((nums[j], nums[j + 1]))
cx, cy = nums[j + 2], nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd == "q":
j = 0
while j + 3 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
cx += nums[j + 2]
cy += nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd in ("S", "s", "T", "t", "A", "a"):
# Simplified handling — just track endpoint
if nums:
if cmd.isupper():
cx, cy = nums[-2], nums[-1]
else:
cx += nums[-2]
cy += nums[-1]
current.append((cx, cy))
if current:
subpaths.append(current)
return subpaths
# ---------------------------------------------------------------------------
# RDP simplification
# ---------------------------------------------------------------------------
def _perpendicular_distance(
point: tuple[float, float],
line_start: tuple[float, float],
line_end: tuple[float, float],
) -> float:
"""Perpendicular distance from a point to a line segment."""
dx = line_end[0] - line_start[0]
dy = line_end[1] - line_start[1]
length_sq = dx * dx + dy * dy
if length_sq == 0:
return math.hypot(point[0] - line_start[0], point[1] - line_start[1])
num = abs(dy * point[0] - dx * point[1] + line_end[0] * line_start[1] - line_end[1] * line_start[0])
return num / math.sqrt(length_sq)
def rdp_simplify(
coords: list[tuple[float, float]], epsilon: float = 1.0
) -> list[tuple[float, float]]:
"""Apply Ramer-Douglas-Peucker simplification to a coordinate list.
Args:
coords: List of (x, y) tuples.
epsilon: Distance threshold higher values produce simpler paths.
Returns:
Simplified list of (x, y) tuples.
"""
if len(coords) <= 2:
return list(coords)
# Find the point with the maximum distance from the line between first and last
max_dist = 0.0
max_idx = 0
for i in range(1, len(coords) - 1):
dist = _perpendicular_distance(coords[i], coords[0], coords[-1])
if dist > max_dist:
max_dist = dist
max_idx = i
if max_dist > epsilon:
left = rdp_simplify(coords[: max_idx + 1], epsilon)
right = rdp_simplify(coords[max_idx:], epsilon)
return left[:-1] + right
else:
return [coords[0], coords[-1]]
# ---------------------------------------------------------------------------
# Geometric analysis
# ---------------------------------------------------------------------------
def signed_area(coords: list[tuple[float, float]]) -> float:
"""Compute the signed area of a polygon using the shoelace formula.
Positive = counter-clockwise (outer contour in SVG convention).
Negative = clockwise (island / hole).
"""
n = len(coords)
if n < 3:
return 0.0
area = 0.0
for i in range(n):
j = (i + 1) % n
area += coords[i][0] * coords[j][1]
area -= coords[j][0] * coords[i][1]
return area / 2.0
def is_closed(coords: list[tuple[float, float]], tolerance: float = 1.0) -> bool:
"""Check if a path's start and end points are within tolerance distance."""
if len(coords) < 2:
return False
return math.hypot(
coords[-1][0] - coords[0][0], coords[-1][1] - coords[0][1]
) <= tolerance
def close_path(coords: list[tuple[float, float]]) -> list[tuple[float, float]]:
"""Close an open path by appending the start point."""
if not coords:
return coords
if coords[-1] != coords[0]:
return coords + [coords[0]]
return list(coords)
def detect_island(coords: list[tuple[float, float]]) -> bool:
"""Detect if a closed path is an island (hole) based on winding direction.
In SVG convention with fill-rule="evenodd", clockwise paths (negative
signed area) represent holes/islands inside counter-clockwise outer contours.
"""
return signed_area(coords) < 0
# ---------------------------------------------------------------------------
# Node counting
# ---------------------------------------------------------------------------
def node_count(coords: list[tuple[float, float]]) -> int:
"""Return the number of unique nodes in a path."""
return len(coords)
# ---------------------------------------------------------------------------
# Full post-processing pipeline
# ---------------------------------------------------------------------------
def postprocess_svg(
svg_str: str,
epsilon: float = 1.0,
close_tolerance: float = 1.0,
auto_close: bool = False,
) -> PostProcessResult:
"""Run the full post-processing pipeline on an SVG string.
1. Parse SVG path data
2. Apply RDP simplification with given epsilon
3. Detect islands (clockwise winding)
4. Detect and optionally repair open paths
5. Count nodes per path
Args:
svg_str: Input SVG string.
epsilon: RDP simplification tolerance.
close_tolerance: Distance threshold for considering a path closed.
auto_close: If True, append start point to open paths.
Returns:
PostProcessResult with per-path metadata and rebuilt SVG.
"""
root = ET.fromstring(svg_str)
ns = {"svg": "http://www.w3.org/2000/svg"}
path_infos: list[PathInfo] = []
for path_el in root.findall("svg:path", ns) or root.findall("path"):
d = path_el.get("d", "")
if not d.strip():
continue
subpaths = parse_svg_path(d)
for coords in subpaths:
if len(coords) < 2:
continue
original_count = node_count(coords)
simplified = rdp_simplify(coords, epsilon)
closed = is_closed(simplified, close_tolerance)
if auto_close and not closed:
simplified = close_path(simplified)
closed = True
island = detect_island(simplified) if closed and len(simplified) >= 3 else False
area = signed_area(simplified)
path_infos.append(
PathInfo(
original_coords=coords,
simplified_coords=simplified,
is_closed=closed,
is_island=island,
node_count=node_count(simplified),
original_node_count=original_count,
area=area,
)
)
# Rebuild SVG with simplified paths
rebuilt_svg = _rebuild_svg(root, path_infos)
total_nodes = sum(p.node_count for p in path_infos)
total_original = sum(p.original_node_count for p in path_infos)
open_count = sum(1 for p in path_infos if not p.is_closed)
island_count = sum(1 for p in path_infos if p.is_island)
return PostProcessResult(
paths=path_infos,
svg=rebuilt_svg,
total_nodes=total_nodes,
total_original_nodes=total_original,
open_path_count=open_count,
island_count=island_count,
)
def _rebuild_svg(root: ET.Element, path_infos: list[PathInfo]) -> str:
"""Rebuild SVG string from post-processed path data."""
width = root.get("width", "100")
height = root.get("height", "100")
viewbox = root.get("viewBox", f"0 0 {width} {height}")
path_parts = []
for info in path_infos:
if len(info.simplified_coords) < 2:
continue
d_parts = []
x0, y0 = info.simplified_coords[0]
d_parts.append(f"M {x0:.3f},{y0:.3f}")
for x, y in info.simplified_coords[1:]:
d_parts.append(f"L {x:.3f},{y:.3f}")
if info.is_closed:
d_parts.append("Z")
path_parts.append(" ".join(d_parts))
d = " ".join(path_parts)
return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" '
f'viewBox="{viewbox}">'
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
f"</svg>"
)

View file

@ -0,0 +1,375 @@
"""Tests for the post-processing pipeline (RDP, island detection, open path repair)."""
import math
import pytest
from pipeline.postprocess import (
PostProcessResult,
close_path,
detect_island,
is_closed,
node_count,
parse_svg_path,
postprocess_svg,
rdp_simplify,
signed_area,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_svg(d: str, width: int = 100, height: int = 100) -> str:
"""Build a minimal SVG string with the given path data."""
return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" '
f'viewBox="0 0 {width} {height}">'
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
f"</svg>"
)
# A simple closed square: 0,0 → 100,0 → 100,100 → 0,100 → close
SQUARE_D = "M 0,0 L 100,0 L 100,100 L 0,100 Z"
# A triangle
TRIANGLE_D = "M 50,0 L 100,100 L 0,100 Z"
# An open path (no Z, endpoints differ)
OPEN_D = "M 0,0 L 50,50 L 100,0"
# Clockwise square (island/hole) — opposite winding from SQUARE_D
CW_SQUARE_D = "M 0,0 L 0,100 L 100,100 L 100,0 Z"
# ---------------------------------------------------------------------------
# SVG path parsing
# ---------------------------------------------------------------------------
class TestParseSvgPath:
def test_simple_move_and_lines(self):
subpaths = parse_svg_path("M 0,0 L 10,0 L 10,10 Z")
assert len(subpaths) == 1
assert subpaths[0][0] == (0.0, 0.0)
assert subpaths[0][1] == (10.0, 0.0)
assert subpaths[0][2] == (10.0, 10.0)
def test_multiple_subpaths(self):
subpaths = parse_svg_path("M 0,0 L 10,10 Z M 20,20 L 30,30 Z")
assert len(subpaths) == 2
def test_cubic_bezier(self):
subpaths = parse_svg_path("M 0,0 C 10,20 30,40 50,60 Z")
assert len(subpaths) == 1
coords = subpaths[0]
assert len(coords) >= 2
# Endpoint (50, 60) should be present; last point is (0,0) from Z close
assert (50.0, 60.0) in coords
assert coords[-1] == (0.0, 0.0) # Z closes back to start
def test_relative_lineto(self):
subpaths = parse_svg_path("M 10,10 l 5,0 l 0,5 Z")
assert len(subpaths) == 1
assert subpaths[0][0] == (10.0, 10.0)
assert subpaths[0][1] == (15.0, 10.0)
assert subpaths[0][2] == (15.0, 15.0)
def test_horizontal_vertical(self):
subpaths = parse_svg_path("M 0,0 H 10 V 10 Z")
assert len(subpaths) == 1
assert (10.0, 0.0) in subpaths[0]
assert (10.0, 10.0) in subpaths[0]
def test_empty_path(self):
subpaths = parse_svg_path("")
assert subpaths == []
def test_move_only(self):
subpaths = parse_svg_path("M 5,5")
assert len(subpaths) == 1
assert subpaths[0] == [(5.0, 5.0)]
def test_quadratic_bezier(self):
subpaths = parse_svg_path("M 0,0 Q 50,100 100,0 Z")
assert len(subpaths) == 1
coords = subpaths[0]
assert (100.0, 0.0) in coords
assert coords[-1] == (0.0, 0.0) # Z closes back to start
# ---------------------------------------------------------------------------
# RDP simplification
# ---------------------------------------------------------------------------
class TestRdpSimplify:
def test_collinear_points_reduced(self):
"""Points along a straight line should be reduced to just endpoints."""
coords = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
result = rdp_simplify(coords, epsilon=0.1)
assert len(result) == 2
assert result[0] == (0, 0)
assert result[-1] == (4, 4)
def test_preserves_corners(self):
"""A right angle should be preserved even with simplification."""
coords = [(0, 0), (10, 0), (10, 10)]
result = rdp_simplify(coords, epsilon=0.5)
assert len(result) == 3
def test_epsilon_zero_preserves_all(self):
"""Epsilon=0 should keep all points."""
coords = [(0, 0), (5, 1), (10, 0)]
result = rdp_simplify(coords, epsilon=0.0)
assert len(result) == 3
def test_high_epsilon_aggressive(self):
"""High epsilon should aggressively simplify."""
coords = [(0, 0), (5, 0.5), (10, 0), (15, 0.3), (20, 0)]
result = rdp_simplify(coords, epsilon=10.0)
assert len(result) == 2
def test_two_points_unchanged(self):
coords = [(0, 0), (10, 10)]
result = rdp_simplify(coords, epsilon=1.0)
assert result == [(0, 0), (10, 10)]
def test_single_point_unchanged(self):
coords = [(5, 5)]
result = rdp_simplify(coords, epsilon=1.0)
assert result == [(5, 5)]
def test_empty_input(self):
result = rdp_simplify([], epsilon=1.0)
assert result == []
def test_reduces_node_count(self):
"""A complex path should have fewer nodes after simplification."""
# Approximate a circle with many points
n = 100
coords = [
(50 + 40 * math.cos(2 * math.pi * i / n),
50 + 40 * math.sin(2 * math.pi * i / n))
for i in range(n)
]
result = rdp_simplify(coords, epsilon=2.0)
assert len(result) < len(coords)
assert len(result) >= 3 # must retain at least a polygon
# ---------------------------------------------------------------------------
# Signed area / winding detection
# ---------------------------------------------------------------------------
class TestSignedArea:
def test_ccw_square_positive(self):
"""Counter-clockwise square should have positive area."""
coords = [(0, 0), (100, 0), (100, 100), (0, 100)]
assert signed_area(coords) > 0
def test_cw_square_negative(self):
"""Clockwise square should have negative area."""
coords = [(0, 0), (0, 100), (100, 100), (100, 0)]
assert signed_area(coords) < 0
def test_area_magnitude(self):
"""Area of a 10x10 square should be 100."""
coords = [(0, 0), (10, 0), (10, 10), (0, 10)]
assert abs(signed_area(coords)) == pytest.approx(100.0)
def test_degenerate_line(self):
"""Two points have zero area."""
assert signed_area([(0, 0), (10, 10)]) == 0.0
def test_single_point(self):
assert signed_area([(0, 0)]) == 0.0
def test_empty(self):
assert signed_area([]) == 0.0
# ---------------------------------------------------------------------------
# Island detection
# ---------------------------------------------------------------------------
class TestDetectIsland:
def test_ccw_is_not_island(self):
coords = [(0, 0), (100, 0), (100, 100), (0, 100)]
assert detect_island(coords) is False
def test_cw_is_island(self):
coords = [(0, 0), (0, 100), (100, 100), (100, 0)]
assert detect_island(coords) is True
# ---------------------------------------------------------------------------
# Open path detection + repair
# ---------------------------------------------------------------------------
class TestIsClosed:
def test_closed_path(self):
coords = [(0, 0), (10, 0), (10, 10), (0, 0)]
assert is_closed(coords) is True
def test_open_path(self):
coords = [(0, 0), (10, 0), (10, 10)]
assert is_closed(coords) is False
def test_nearly_closed(self):
"""Path within tolerance should count as closed."""
coords = [(0, 0), (10, 0), (10, 10), (0.5, 0.3)]
assert is_closed(coords, tolerance=1.0) is True
def test_single_point(self):
assert is_closed([(0, 0)]) is False
def test_empty(self):
assert is_closed([]) is False
class TestClosePath:
def test_closes_open_path(self):
coords = [(0, 0), (10, 0), (10, 10)]
result = close_path(coords)
assert result[-1] == result[0]
assert len(result) == 4
def test_already_closed(self):
coords = [(0, 0), (10, 0), (10, 10), (0, 0)]
result = close_path(coords)
assert len(result) == 4 # no duplicate added
def test_empty(self):
assert close_path([]) == []
# ---------------------------------------------------------------------------
# Node counting
# ---------------------------------------------------------------------------
class TestNodeCount:
def test_counts_nodes(self):
assert node_count([(0, 0), (1, 1), (2, 2)]) == 3
def test_empty(self):
assert node_count([]) == 0
# ---------------------------------------------------------------------------
# Full pipeline integration
# ---------------------------------------------------------------------------
class TestPostprocessSvg:
def test_returns_result_object(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert isinstance(result, PostProcessResult)
def test_path_count(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert len(result.paths) >= 1
def test_node_count_reduction(self):
"""Simplification should reduce or maintain node count."""
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg, epsilon=0.5)
for path in result.paths:
assert path.node_count <= path.original_node_count
def test_total_nodes_tracked(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert result.total_nodes == sum(p.node_count for p in result.paths)
def test_closed_path_detected(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
# Square with Z should be detected as closed
assert any(p.is_closed for p in result.paths)
def test_open_path_detected(self):
svg = _make_svg(OPEN_D)
result = postprocess_svg(svg)
assert result.open_path_count >= 1
def test_auto_close(self):
svg = _make_svg(OPEN_D)
result = postprocess_svg(svg, auto_close=True)
# After auto-close, no open paths should remain
assert result.open_path_count == 0
def test_island_detection(self):
# Combine an outer CCW path with an inner CW path
combined_d = f"{SQUARE_D} {CW_SQUARE_D}"
svg = _make_svg(combined_d)
result = postprocess_svg(svg)
assert result.island_count >= 1
def test_output_svg_is_well_formed(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
import xml.etree.ElementTree as ET
root = ET.fromstring(result.svg)
assert root.tag == "{http://www.w3.org/2000/svg}svg"
def test_output_svg_has_path(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
import xml.etree.ElementTree as ET
root = ET.fromstring(result.svg)
ns = {"svg": "http://www.w3.org/2000/svg"}
paths = root.findall("svg:path", ns)
assert len(paths) >= 1
def test_epsilon_affects_simplification(self):
"""Higher epsilon should produce fewer or equal nodes."""
# Build a complex path
n = 50
points = " ".join(
f"L {50 + 40 * math.cos(2 * math.pi * i / n):.3f},"
f"{50 + 40 * math.sin(2 * math.pi * i / n):.3f}"
for i in range(1, n)
)
x0 = 50 + 40 * math.cos(0)
y0 = 50 + 40 * math.sin(0)
d = f"M {x0:.3f},{y0:.3f} {points} Z"
svg = _make_svg(d)
result_low = postprocess_svg(svg, epsilon=0.1)
result_high = postprocess_svg(svg, epsilon=10.0)
assert result_high.total_nodes <= result_low.total_nodes
class TestPostprocessWithVectorizerOutput:
"""Integration test — feed real vectorizer SVG through post-processing."""
def test_potrace_output(self):
"""Post-process real Potrace output."""
import numpy as np
from pipeline.vectorize import potrace_trace
img = np.zeros((100, 100), dtype=np.uint8)
img[20:80, 20:80] = 255
svg = potrace_trace(img)
result = postprocess_svg(svg, epsilon=1.0)
assert isinstance(result, PostProcessResult)
assert len(result.paths) >= 1
assert result.total_nodes > 0
def test_vtracer_output(self):
"""Post-process real VTracer output."""
import numpy as np
from pipeline.vectorize import vtracer_trace
img = np.zeros((100, 100), dtype=np.uint8)
img[20:80, 20:80] = 255
svg = vtracer_trace(img)
result = postprocess_svg(svg, epsilon=1.0)
assert isinstance(result, PostProcessResult)
assert len(result.paths) >= 1
assert result.total_nodes > 0