kerf-engine/engine/pipeline/postprocess.py
jlightner 4539a488bc fix: Implemented RDP path simplification, island/hole detection via win…
- engine/pipeline/postprocess.py
- engine/tests/test_postprocess.py

GSD-Task: S02/T01
2026-03-26 04:32:31 +00:00

414 lines
13 KiB
Python

"""Post-processing pipeline — RDP simplification, island detection, open path repair."""
from __future__ import annotations
import math
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
@dataclass
class PathInfo:
"""Metadata and geometry for a single SVG path after post-processing."""
original_coords: list[tuple[float, float]]
simplified_coords: list[tuple[float, float]]
is_closed: bool
is_island: bool
node_count: int
original_node_count: int
area: float # signed — negative means clockwise (island/hole)
@dataclass
class PostProcessResult:
"""Aggregated result of running post-processing on an SVG."""
paths: list[PathInfo]
svg: str
total_nodes: int
total_original_nodes: int
open_path_count: int
island_count: int
# ---------------------------------------------------------------------------
# SVG path parsing
# ---------------------------------------------------------------------------
_CMD_RE = re.compile(r"([MmLlHhVvCcSsQqTtAaZz])")
_NUM_RE = re.compile(r"[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
def parse_svg_path(d: str) -> list[list[tuple[float, float]]]:
"""Parse an SVG path `d` attribute into a list of subpaths.
Each subpath is a list of (x, y) coordinate tuples extracted from
M/L/C/Q/Z commands. Curves are sampled at their control points and
endpoints (not flattened), which is sufficient for RDP simplification
and geometric analysis.
Returns:
List of subpaths, each a list of (x, y) tuples.
"""
tokens = _CMD_RE.split(d)
subpaths: list[list[tuple[float, float]]] = []
current: list[tuple[float, float]] = []
cx, cy = 0.0, 0.0
subpath_start: tuple[float, float] | None = None
i = 0
while i < len(tokens):
token = tokens[i].strip()
if not token:
i += 1
continue
if token in ("Z", "z"):
# Close the subpath by appending start point if not already there
if current and subpath_start is not None:
if current[-1] != subpath_start:
current.append(subpath_start)
subpaths.append(current)
current = []
i += 1
continue
if len(token) == 1 and token.isalpha():
cmd = token
i += 1
if i < len(tokens):
nums = [float(n) for n in _NUM_RE.findall(tokens[i])]
else:
nums = []
i += 1
else:
i += 1
continue
if cmd == "M":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "m":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "L":
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "l":
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "H":
for n in nums:
cx = n
current.append((cx, cy))
elif cmd == "h":
for n in nums:
cx += n
current.append((cx, cy))
elif cmd == "V":
for n in nums:
cy = n
current.append((cx, cy))
elif cmd == "v":
for n in nums:
cy += n
current.append((cx, cy))
elif cmd == "C":
j = 0
while j + 5 < len(nums):
# c1, c2, endpoint — keep endpoint for simplification
cx, cy = nums[j + 4], nums[j + 5]
current.append((nums[j], nums[j + 1]))
current.append((nums[j + 2], nums[j + 3]))
current.append((cx, cy))
j += 6
elif cmd == "c":
j = 0
while j + 5 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
current.append((cx + nums[j + 2], cy + nums[j + 3]))
cx += nums[j + 4]
cy += nums[j + 5]
current.append((cx, cy))
j += 6
elif cmd == "Q":
j = 0
while j + 3 < len(nums):
current.append((nums[j], nums[j + 1]))
cx, cy = nums[j + 2], nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd == "q":
j = 0
while j + 3 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
cx += nums[j + 2]
cy += nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd in ("S", "s", "T", "t", "A", "a"):
# Simplified handling — just track endpoint
if nums:
if cmd.isupper():
cx, cy = nums[-2], nums[-1]
else:
cx += nums[-2]
cy += nums[-1]
current.append((cx, cy))
if current:
subpaths.append(current)
return subpaths
# ---------------------------------------------------------------------------
# RDP simplification
# ---------------------------------------------------------------------------
def _perpendicular_distance(
point: tuple[float, float],
line_start: tuple[float, float],
line_end: tuple[float, float],
) -> float:
"""Perpendicular distance from a point to a line segment."""
dx = line_end[0] - line_start[0]
dy = line_end[1] - line_start[1]
length_sq = dx * dx + dy * dy
if length_sq == 0:
return math.hypot(point[0] - line_start[0], point[1] - line_start[1])
num = abs(dy * point[0] - dx * point[1] + line_end[0] * line_start[1] - line_end[1] * line_start[0])
return num / math.sqrt(length_sq)
def rdp_simplify(
coords: list[tuple[float, float]], epsilon: float = 1.0
) -> list[tuple[float, float]]:
"""Apply Ramer-Douglas-Peucker simplification to a coordinate list.
Args:
coords: List of (x, y) tuples.
epsilon: Distance threshold — higher values produce simpler paths.
Returns:
Simplified list of (x, y) tuples.
"""
if len(coords) <= 2:
return list(coords)
# Find the point with the maximum distance from the line between first and last
max_dist = 0.0
max_idx = 0
for i in range(1, len(coords) - 1):
dist = _perpendicular_distance(coords[i], coords[0], coords[-1])
if dist > max_dist:
max_dist = dist
max_idx = i
if max_dist > epsilon:
left = rdp_simplify(coords[: max_idx + 1], epsilon)
right = rdp_simplify(coords[max_idx:], epsilon)
return left[:-1] + right
else:
return [coords[0], coords[-1]]
# ---------------------------------------------------------------------------
# Geometric analysis
# ---------------------------------------------------------------------------
def signed_area(coords: list[tuple[float, float]]) -> float:
"""Compute the signed area of a polygon using the shoelace formula.
Positive = counter-clockwise (outer contour in SVG convention).
Negative = clockwise (island / hole).
"""
n = len(coords)
if n < 3:
return 0.0
area = 0.0
for i in range(n):
j = (i + 1) % n
area += coords[i][0] * coords[j][1]
area -= coords[j][0] * coords[i][1]
return area / 2.0
def is_closed(coords: list[tuple[float, float]], tolerance: float = 1.0) -> bool:
"""Check if a path's start and end points are within tolerance distance."""
if len(coords) < 2:
return False
return math.hypot(
coords[-1][0] - coords[0][0], coords[-1][1] - coords[0][1]
) <= tolerance
def close_path(coords: list[tuple[float, float]]) -> list[tuple[float, float]]:
"""Close an open path by appending the start point."""
if not coords:
return coords
if coords[-1] != coords[0]:
return coords + [coords[0]]
return list(coords)
def detect_island(coords: list[tuple[float, float]]) -> bool:
"""Detect if a closed path is an island (hole) based on winding direction.
In SVG convention with fill-rule="evenodd", clockwise paths (negative
signed area) represent holes/islands inside counter-clockwise outer contours.
"""
return signed_area(coords) < 0
# ---------------------------------------------------------------------------
# Node counting
# ---------------------------------------------------------------------------
def node_count(coords: list[tuple[float, float]]) -> int:
"""Return the number of unique nodes in a path."""
return len(coords)
# ---------------------------------------------------------------------------
# Full post-processing pipeline
# ---------------------------------------------------------------------------
def postprocess_svg(
svg_str: str,
epsilon: float = 1.0,
close_tolerance: float = 1.0,
auto_close: bool = False,
) -> PostProcessResult:
"""Run the full post-processing pipeline on an SVG string.
1. Parse SVG path data
2. Apply RDP simplification with given epsilon
3. Detect islands (clockwise winding)
4. Detect and optionally repair open paths
5. Count nodes per path
Args:
svg_str: Input SVG string.
epsilon: RDP simplification tolerance.
close_tolerance: Distance threshold for considering a path closed.
auto_close: If True, append start point to open paths.
Returns:
PostProcessResult with per-path metadata and rebuilt SVG.
"""
root = ET.fromstring(svg_str)
ns = {"svg": "http://www.w3.org/2000/svg"}
path_infos: list[PathInfo] = []
for path_el in root.findall("svg:path", ns) or root.findall("path"):
d = path_el.get("d", "")
if not d.strip():
continue
subpaths = parse_svg_path(d)
for coords in subpaths:
if len(coords) < 2:
continue
original_count = node_count(coords)
simplified = rdp_simplify(coords, epsilon)
closed = is_closed(simplified, close_tolerance)
if auto_close and not closed:
simplified = close_path(simplified)
closed = True
island = detect_island(simplified) if closed and len(simplified) >= 3 else False
area = signed_area(simplified)
path_infos.append(
PathInfo(
original_coords=coords,
simplified_coords=simplified,
is_closed=closed,
is_island=island,
node_count=node_count(simplified),
original_node_count=original_count,
area=area,
)
)
# Rebuild SVG with simplified paths
rebuilt_svg = _rebuild_svg(root, path_infos)
total_nodes = sum(p.node_count for p in path_infos)
total_original = sum(p.original_node_count for p in path_infos)
open_count = sum(1 for p in path_infos if not p.is_closed)
island_count = sum(1 for p in path_infos if p.is_island)
return PostProcessResult(
paths=path_infos,
svg=rebuilt_svg,
total_nodes=total_nodes,
total_original_nodes=total_original,
open_path_count=open_count,
island_count=island_count,
)
def _rebuild_svg(root: ET.Element, path_infos: list[PathInfo]) -> str:
"""Rebuild SVG string from post-processed path data."""
width = root.get("width", "100")
height = root.get("height", "100")
viewbox = root.get("viewBox", f"0 0 {width} {height}")
path_parts = []
for info in path_infos:
if len(info.simplified_coords) < 2:
continue
d_parts = []
x0, y0 = info.simplified_coords[0]
d_parts.append(f"M {x0:.3f},{y0:.3f}")
for x, y in info.simplified_coords[1:]:
d_parts.append(f"L {x:.3f},{y:.3f}")
if info.is_closed:
d_parts.append("Z")
path_parts.append(" ".join(d_parts))
d = " ".join(path_parts)
return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" '
f'viewBox="{viewbox}">'
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
f"</svg>"
)