414 lines
13 KiB
Python
414 lines
13 KiB
Python
"""Post-processing pipeline — RDP simplification, island detection, open path repair."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
@dataclass
|
|
class PathInfo:
|
|
"""Metadata and geometry for a single SVG path after post-processing."""
|
|
|
|
original_coords: list[tuple[float, float]]
|
|
simplified_coords: list[tuple[float, float]]
|
|
is_closed: bool
|
|
is_island: bool
|
|
node_count: int
|
|
original_node_count: int
|
|
area: float # signed — negative means clockwise (island/hole)
|
|
|
|
|
|
@dataclass
|
|
class PostProcessResult:
|
|
"""Aggregated result of running post-processing on an SVG."""
|
|
|
|
paths: list[PathInfo]
|
|
svg: str
|
|
total_nodes: int
|
|
total_original_nodes: int
|
|
open_path_count: int
|
|
island_count: int
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SVG path parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CMD_RE = re.compile(r"([MmLlHhVvCcSsQqTtAaZz])")
|
|
_NUM_RE = re.compile(r"[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
|
|
|
|
|
|
def parse_svg_path(d: str) -> list[list[tuple[float, float]]]:
|
|
"""Parse an SVG path `d` attribute into a list of subpaths.
|
|
|
|
Each subpath is a list of (x, y) coordinate tuples extracted from
|
|
M/L/C/Q/Z commands. Curves are sampled at their control points and
|
|
endpoints (not flattened), which is sufficient for RDP simplification
|
|
and geometric analysis.
|
|
|
|
Returns:
|
|
List of subpaths, each a list of (x, y) tuples.
|
|
"""
|
|
tokens = _CMD_RE.split(d)
|
|
subpaths: list[list[tuple[float, float]]] = []
|
|
current: list[tuple[float, float]] = []
|
|
cx, cy = 0.0, 0.0
|
|
|
|
subpath_start: tuple[float, float] | None = None
|
|
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i].strip()
|
|
if not token:
|
|
i += 1
|
|
continue
|
|
|
|
if token in ("Z", "z"):
|
|
# Close the subpath by appending start point if not already there
|
|
if current and subpath_start is not None:
|
|
if current[-1] != subpath_start:
|
|
current.append(subpath_start)
|
|
subpaths.append(current)
|
|
current = []
|
|
i += 1
|
|
continue
|
|
|
|
if len(token) == 1 and token.isalpha():
|
|
cmd = token
|
|
i += 1
|
|
if i < len(tokens):
|
|
nums = [float(n) for n in _NUM_RE.findall(tokens[i])]
|
|
else:
|
|
nums = []
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
continue
|
|
|
|
if cmd == "M":
|
|
if current:
|
|
subpaths.append(current)
|
|
current = []
|
|
j = 0
|
|
while j + 1 < len(nums):
|
|
cx, cy = nums[j], nums[j + 1]
|
|
current.append((cx, cy))
|
|
if j == 0:
|
|
subpath_start = (cx, cy)
|
|
j += 2
|
|
elif cmd == "m":
|
|
if current:
|
|
subpaths.append(current)
|
|
current = []
|
|
j = 0
|
|
while j + 1 < len(nums):
|
|
cx += nums[j]
|
|
cy += nums[j + 1]
|
|
current.append((cx, cy))
|
|
if j == 0:
|
|
subpath_start = (cx, cy)
|
|
j += 2
|
|
elif cmd == "L":
|
|
j = 0
|
|
while j + 1 < len(nums):
|
|
cx, cy = nums[j], nums[j + 1]
|
|
current.append((cx, cy))
|
|
j += 2
|
|
elif cmd == "l":
|
|
j = 0
|
|
while j + 1 < len(nums):
|
|
cx += nums[j]
|
|
cy += nums[j + 1]
|
|
current.append((cx, cy))
|
|
j += 2
|
|
elif cmd == "H":
|
|
for n in nums:
|
|
cx = n
|
|
current.append((cx, cy))
|
|
elif cmd == "h":
|
|
for n in nums:
|
|
cx += n
|
|
current.append((cx, cy))
|
|
elif cmd == "V":
|
|
for n in nums:
|
|
cy = n
|
|
current.append((cx, cy))
|
|
elif cmd == "v":
|
|
for n in nums:
|
|
cy += n
|
|
current.append((cx, cy))
|
|
elif cmd == "C":
|
|
j = 0
|
|
while j + 5 < len(nums):
|
|
# c1, c2, endpoint — keep endpoint for simplification
|
|
cx, cy = nums[j + 4], nums[j + 5]
|
|
current.append((nums[j], nums[j + 1]))
|
|
current.append((nums[j + 2], nums[j + 3]))
|
|
current.append((cx, cy))
|
|
j += 6
|
|
elif cmd == "c":
|
|
j = 0
|
|
while j + 5 < len(nums):
|
|
current.append((cx + nums[j], cy + nums[j + 1]))
|
|
current.append((cx + nums[j + 2], cy + nums[j + 3]))
|
|
cx += nums[j + 4]
|
|
cy += nums[j + 5]
|
|
current.append((cx, cy))
|
|
j += 6
|
|
elif cmd == "Q":
|
|
j = 0
|
|
while j + 3 < len(nums):
|
|
current.append((nums[j], nums[j + 1]))
|
|
cx, cy = nums[j + 2], nums[j + 3]
|
|
current.append((cx, cy))
|
|
j += 4
|
|
elif cmd == "q":
|
|
j = 0
|
|
while j + 3 < len(nums):
|
|
current.append((cx + nums[j], cy + nums[j + 1]))
|
|
cx += nums[j + 2]
|
|
cy += nums[j + 3]
|
|
current.append((cx, cy))
|
|
j += 4
|
|
elif cmd in ("S", "s", "T", "t", "A", "a"):
|
|
# Simplified handling — just track endpoint
|
|
if nums:
|
|
if cmd.isupper():
|
|
cx, cy = nums[-2], nums[-1]
|
|
else:
|
|
cx += nums[-2]
|
|
cy += nums[-1]
|
|
current.append((cx, cy))
|
|
|
|
if current:
|
|
subpaths.append(current)
|
|
|
|
return subpaths
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RDP simplification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _perpendicular_distance(
|
|
point: tuple[float, float],
|
|
line_start: tuple[float, float],
|
|
line_end: tuple[float, float],
|
|
) -> float:
|
|
"""Perpendicular distance from a point to a line segment."""
|
|
dx = line_end[0] - line_start[0]
|
|
dy = line_end[1] - line_start[1]
|
|
length_sq = dx * dx + dy * dy
|
|
if length_sq == 0:
|
|
return math.hypot(point[0] - line_start[0], point[1] - line_start[1])
|
|
num = abs(dy * point[0] - dx * point[1] + line_end[0] * line_start[1] - line_end[1] * line_start[0])
|
|
return num / math.sqrt(length_sq)
|
|
|
|
|
|
def rdp_simplify(
|
|
coords: list[tuple[float, float]], epsilon: float = 1.0
|
|
) -> list[tuple[float, float]]:
|
|
"""Apply Ramer-Douglas-Peucker simplification to a coordinate list.
|
|
|
|
Args:
|
|
coords: List of (x, y) tuples.
|
|
epsilon: Distance threshold — higher values produce simpler paths.
|
|
|
|
Returns:
|
|
Simplified list of (x, y) tuples.
|
|
"""
|
|
if len(coords) <= 2:
|
|
return list(coords)
|
|
|
|
# Find the point with the maximum distance from the line between first and last
|
|
max_dist = 0.0
|
|
max_idx = 0
|
|
for i in range(1, len(coords) - 1):
|
|
dist = _perpendicular_distance(coords[i], coords[0], coords[-1])
|
|
if dist > max_dist:
|
|
max_dist = dist
|
|
max_idx = i
|
|
|
|
if max_dist > epsilon:
|
|
left = rdp_simplify(coords[: max_idx + 1], epsilon)
|
|
right = rdp_simplify(coords[max_idx:], epsilon)
|
|
return left[:-1] + right
|
|
else:
|
|
return [coords[0], coords[-1]]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Geometric analysis
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def signed_area(coords: list[tuple[float, float]]) -> float:
|
|
"""Compute the signed area of a polygon using the shoelace formula.
|
|
|
|
Positive = counter-clockwise (outer contour in SVG convention).
|
|
Negative = clockwise (island / hole).
|
|
"""
|
|
n = len(coords)
|
|
if n < 3:
|
|
return 0.0
|
|
area = 0.0
|
|
for i in range(n):
|
|
j = (i + 1) % n
|
|
area += coords[i][0] * coords[j][1]
|
|
area -= coords[j][0] * coords[i][1]
|
|
return area / 2.0
|
|
|
|
|
|
def is_closed(coords: list[tuple[float, float]], tolerance: float = 1.0) -> bool:
|
|
"""Check if a path's start and end points are within tolerance distance."""
|
|
if len(coords) < 2:
|
|
return False
|
|
return math.hypot(
|
|
coords[-1][0] - coords[0][0], coords[-1][1] - coords[0][1]
|
|
) <= tolerance
|
|
|
|
|
|
def close_path(coords: list[tuple[float, float]]) -> list[tuple[float, float]]:
|
|
"""Close an open path by appending the start point."""
|
|
if not coords:
|
|
return coords
|
|
if coords[-1] != coords[0]:
|
|
return coords + [coords[0]]
|
|
return list(coords)
|
|
|
|
|
|
def detect_island(coords: list[tuple[float, float]]) -> bool:
|
|
"""Detect if a closed path is an island (hole) based on winding direction.
|
|
|
|
In SVG convention with fill-rule="evenodd", clockwise paths (negative
|
|
signed area) represent holes/islands inside counter-clockwise outer contours.
|
|
"""
|
|
return signed_area(coords) < 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Node counting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def node_count(coords: list[tuple[float, float]]) -> int:
|
|
"""Return the number of unique nodes in a path."""
|
|
return len(coords)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Full post-processing pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def postprocess_svg(
|
|
svg_str: str,
|
|
epsilon: float = 1.0,
|
|
close_tolerance: float = 1.0,
|
|
auto_close: bool = False,
|
|
) -> PostProcessResult:
|
|
"""Run the full post-processing pipeline on an SVG string.
|
|
|
|
1. Parse SVG path data
|
|
2. Apply RDP simplification with given epsilon
|
|
3. Detect islands (clockwise winding)
|
|
4. Detect and optionally repair open paths
|
|
5. Count nodes per path
|
|
|
|
Args:
|
|
svg_str: Input SVG string.
|
|
epsilon: RDP simplification tolerance.
|
|
close_tolerance: Distance threshold for considering a path closed.
|
|
auto_close: If True, append start point to open paths.
|
|
|
|
Returns:
|
|
PostProcessResult with per-path metadata and rebuilt SVG.
|
|
"""
|
|
root = ET.fromstring(svg_str)
|
|
ns = {"svg": "http://www.w3.org/2000/svg"}
|
|
|
|
path_infos: list[PathInfo] = []
|
|
|
|
for path_el in root.findall("svg:path", ns) or root.findall("path"):
|
|
d = path_el.get("d", "")
|
|
if not d.strip():
|
|
continue
|
|
|
|
subpaths = parse_svg_path(d)
|
|
for coords in subpaths:
|
|
if len(coords) < 2:
|
|
continue
|
|
|
|
original_count = node_count(coords)
|
|
simplified = rdp_simplify(coords, epsilon)
|
|
closed = is_closed(simplified, close_tolerance)
|
|
|
|
if auto_close and not closed:
|
|
simplified = close_path(simplified)
|
|
closed = True
|
|
|
|
island = detect_island(simplified) if closed and len(simplified) >= 3 else False
|
|
area = signed_area(simplified)
|
|
|
|
path_infos.append(
|
|
PathInfo(
|
|
original_coords=coords,
|
|
simplified_coords=simplified,
|
|
is_closed=closed,
|
|
is_island=island,
|
|
node_count=node_count(simplified),
|
|
original_node_count=original_count,
|
|
area=area,
|
|
)
|
|
)
|
|
|
|
# Rebuild SVG with simplified paths
|
|
rebuilt_svg = _rebuild_svg(root, path_infos)
|
|
|
|
total_nodes = sum(p.node_count for p in path_infos)
|
|
total_original = sum(p.original_node_count for p in path_infos)
|
|
open_count = sum(1 for p in path_infos if not p.is_closed)
|
|
island_count = sum(1 for p in path_infos if p.is_island)
|
|
|
|
return PostProcessResult(
|
|
paths=path_infos,
|
|
svg=rebuilt_svg,
|
|
total_nodes=total_nodes,
|
|
total_original_nodes=total_original,
|
|
open_path_count=open_count,
|
|
island_count=island_count,
|
|
)
|
|
|
|
|
|
def _rebuild_svg(root: ET.Element, path_infos: list[PathInfo]) -> str:
|
|
"""Rebuild SVG string from post-processed path data."""
|
|
width = root.get("width", "100")
|
|
height = root.get("height", "100")
|
|
viewbox = root.get("viewBox", f"0 0 {width} {height}")
|
|
|
|
path_parts = []
|
|
for info in path_infos:
|
|
if len(info.simplified_coords) < 2:
|
|
continue
|
|
d_parts = []
|
|
x0, y0 = info.simplified_coords[0]
|
|
d_parts.append(f"M {x0:.3f},{y0:.3f}")
|
|
for x, y in info.simplified_coords[1:]:
|
|
d_parts.append(f"L {x:.3f},{y:.3f}")
|
|
if info.is_closed:
|
|
d_parts.append("Z")
|
|
path_parts.append(" ".join(d_parts))
|
|
|
|
d = " ".join(path_parts)
|
|
|
|
return (
|
|
f'<svg xmlns="http://www.w3.org/2000/svg" '
|
|
f'width="{width}" height="{height}" '
|
|
f'viewBox="{viewbox}">'
|
|
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
|
|
f"</svg>"
|
|
)
|