fix: Implemented RDP path simplification, island/hole detection via win…

- engine/pipeline/postprocess.py
- engine/tests/test_postprocess.py

GSD-Task: S02/T01
This commit is contained in:
jlightner 2026-03-26 04:32:31 +00:00
parent 291a810605
commit 4539a488bc
10 changed files with 1036 additions and 3 deletions

View file

@ -8,3 +8,5 @@
|---|------|-------|----------|--------|-----------|------------|---------| |---|------|-------|----------|--------|-----------|------------|---------|
| D001 | | architecture | Engine/App architectural relationship | Engine is standalone module, App is a consumer. Zero coupling — App calls Engine only via HTTP API. | Engine is proprietary IP that must be embeddable into future applications independently. Clean input/output contracts enable this. | No | human | | D001 | | architecture | Engine/App architectural relationship | Engine is standalone module, App is a consumer. Zero coupling — App calls Engine only via HTTP API. | Engine is proprietary IP that must be embeddable into future applications independently. Clean input/output contracts enable this. | No | human |
| D002 | | architecture | Build order and gating strategy | Build Engine first (M001), then App canvas (M002), then Export+Deploy+Embed (M003). Human checkpoints gate each transition. | Brief explicitly mandates: validate engine output quality before building canvas UI, validate canvas before export/deploy. Engine is the hardest and most valuable piece. | No | human | | D002 | | architecture | Build order and gating strategy | Build Engine first (M001), then App canvas (M002), then Export+Deploy+Embed (M003). Human checkpoints gate each transition. | Brief explicitly mandates: validate engine output quality before building canvas UI, validate canvas before export/deploy. Engine is the hardest and most valuable piece. | No | human |
| D003 | 2026-03-26 | engine | SVG metadata extraction method | Regex-based extraction from SVG string, not XML parsing | Potrace and VTracer output well-known SVG structures. Regex avoids adding lxml/defusedxml dependency for a simple path/node count. May need revisiting if external SVGs are processed. | Yes | agent |
| D004 | 2026-03-26 | engine | Vectorize param filtering | User params filtered by allowlist of known param names per mode before passing to vectorize functions | Prevents unexpected kwargs from reaching potrace/vtracer backends. New params for future stages must be explicitly added to the filter in routes.py. | Yes | agent |

View file

@ -12,9 +12,13 @@ Agents read this before every unit. Add entries when you discover something wort
| # | Pattern | Where | Notes | | # | Pattern | Where | Notes |
|---|---------|-------|-------| |---|---------|-------|-------|
| P001 | Test images generated programmatically via numpy | engine/tests/ | No fixture image files checked in. Tests create shapes (rectangles, circles) with numpy + cv2. Continue this pattern in S02/S03. |
| P002 | Tests must use .venv/bin/python -m pytest | engine/ | No system-wide `python` on PATH. Bare `python -m pytest` fails with exit 127. Always use the venv binary. |
## Lessons Learned ## Lessons Learned
| # | What Happened | Root Cause | Fix | Scope | | # | What Happened | Root Cause | Fix | Scope |
|---|--------------|------------|-----|-------| |---|--------------|------------|-----|-------|
| L001 | pypotrace fails to build from pip | Requires system packages: `libpotrace-dev`, `libagg-dev`, `pkg-config` | `apt-get install -y libpotrace-dev libagg-dev pkg-config` before `pip install pypotrace` | engine build, Docker | | L001 | pypotrace fails to build from pip | Requires system packages: `libpotrace-dev`, `libagg-dev`, `pkg-config` | `apt-get install -y libpotrace-dev libagg-dev pkg-config` before `pip install pypotrace` | engine build, Docker |
| L002 | VTracer Python bindings work directly — no subprocess needed | vtracer pip package exposes `convert_raw_image_to_svg()` that accepts PNG bytes | Use `vtracer.convert_raw_image_to_svg(png_bytes, img_format="png", ...)` | engine vectorize |
| L003 | pypotrace Bitmap requires uint32 data | Passing other dtypes (uint8, float) can cause segfaults | Always cast: `(img > 0).astype(np.uint32)` before `potrace.Bitmap(data)` | engine vectorize |

View file

@ -6,6 +6,6 @@ Build and validate the standalone Kerf Engine: a stateless HTTP API that accepts
## Slice Overview ## Slice Overview
| ID | Slice | Risk | Depends | Done | After this | | ID | Slice | Risk | Depends | Done | After this |
|----|-------|------|---------|------|------------| |----|-------|------|---------|------|------------|
| S01 | Core Pipeline — Preprocessing + Vectorization | high — dependency installation, opencv+potrace+vtracer integration | — | | POST /engine/trace with a PNG logo returns valid SVG using both Potrace and VTracer modes | | S01 | Core Pipeline — Preprocessing + Vectorization | high — dependency installation, opencv+potrace+vtracer integration | — | | POST /engine/trace with a PNG logo returns valid SVG using both Potrace and VTracer modes |
| S02 | Post-Processing + Output Formats (SVG, DXF, JSON) | high — dxf generation quality is hard to validate programmatically | S01 | ⬜ | /engine/trace returns valid DXF and JSON output; /engine/simplify reduces node count on complex SVG | | S02 | Post-Processing + Output Formats (SVG, DXF, JSON) | high — dxf generation quality is hard to validate programmatically | S01 | ⬜ | /engine/trace returns valid DXF and JSON output; /engine/simplify reduces node count on complex SVG |
| S03 | Preset System + Engine Docker Packaging | low — presets are config files; docker packaging is well-understood | S02 | ⬜ | GET /engine/presets returns all presets; each preset produces distinct output from same input; engine runs in Docker | | S03 | Preset System + Engine Docker Packaging | low — presets are config files; docker packaging is well-understood | S02 | ⬜ | GET /engine/presets returns all presets; each preset produces distinct output from same input; engine runs in Docker |

View file

@ -49,4 +49,29 @@ Created `engine/api/routes.py` with POST /engine/trace endpoint. Accepts multipa
Full test suite: `cd engine && .venv/bin/python -m pytest tests/ -v` — 76 passed Full test suite: `cd engine && .venv/bin/python -m pytest tests/ -v` — 76 passed
## Demo ## Demo
POST /engine/trace with a PNG logo returns valid SVG using both Potrace and VTracer modes. ✅ POST /engine/trace with a PNG logo returns valid SVG using both Potrace and VTracer modes.
## Known Limitations
- Output format is SVG-only — DXF and JSON deferred to S02
- `preset` parameter accepted but ignored — preset system deferred to S03
- No Docker packaging yet — deferred to S03
- Tests must run with `.venv/bin/python -m pytest`, not bare `python`
## Forward Intelligence
### What the next slice should know
- Response shape: `{output: str, format: str, metadata: {path_count, node_count_total, open_paths, warnings, processing_ms}}`. S02 must extend for DXF/JSON without breaking this contract.
- VTracer returns SVG with XML declaration + generator comment; Potrace's `_path_to_svg()` returns bare SVG. Post-processing must handle both.
- User params are filtered by known names per mode in routes.py lines 80-91. New params for S02 post-processing stages must be added to these filter lists.
### What's fragile
- `_extract_svg_metadata()` regex assumes `<path d="...">` — if output format changes, metadata counts silently return 0.
- pypotrace Bitmap requires uint32 data — feeding other dtypes directly will segfault (see L003 in KNOWLEDGE.md).
### Authoritative diagnostics
- `engine/.venv/bin/python -m pytest tests/ -v` — full pipeline health check, runs in <1s.
- `GET /health` returns `{"status": "ok"}` — confirms FastAPI is running.
### What assumptions changed
- VTracer Python bindings work via `convert_raw_image_to_svg()` with PNG bytes — no subprocess needed (original plan suggested subprocess might be required).
- Test images are generated programmatically via numpy, not loaded from fixture files.

View file

@ -0,0 +1,122 @@
# S01: Core Pipeline — Preprocessing + Vectorization — UAT
**Milestone:** M001
**Written:** 2026-03-26
## UAT Type
- UAT mode: artifact-driven
- Why this mode is sufficient: All pipeline stages are pure functions with deterministic output from synthetic inputs. The 76 automated tests cover the full pipeline from raw bytes to SVG response. No live runtime or human-visual judgment is needed at this stage.
## Preconditions
- Python venv is set up: `engine/.venv/bin/python` exists
- All dependencies installed: `cd engine && .venv/bin/python -c "import cv2, potrace, vtracer, fastapi"` exits 0
- System C libraries present: `dpkg -l libpotrace-dev libagg-dev pkg-config` shows installed
## Smoke Test
```bash
cd engine && .venv/bin/python -m pytest tests/ -v --tb=short
```
**Expected:** 76 passed, 0 failed, 0 errors. Exit code 0.
## Test Cases
### 1. Full test suite passes
1. `cd engine && .venv/bin/python -m pytest tests/ -v`
2. **Expected:** 76 tests pass — 24 preprocessing, 38 vectorize, 14 API integration
### 2. Potrace mode produces valid SVG from PNG
1. `cd engine && .venv/bin/python -m pytest tests/test_api.py::TestTraceEndpointPotrace -v`
2. **Expected:** 4 tests pass — basic trace returns SVG, metadata has correct shape, SVG contains `<path>` elements, custom params are accepted
### 3. VTracer mode produces valid SVG from PNG
1. `cd engine && .venv/bin/python -m pytest tests/test_api.py::TestTraceEndpointVtracer -v`
2. **Expected:** 3 tests pass — basic trace returns SVG, metadata present, custom params accepted
### 4. Validation rejects bad input
1. `cd engine && .venv/bin/python -m pytest tests/test_api.py::TestTraceEndpointValidation -v`
2. **Expected:** 7 tests pass — invalid mode (422), unsupported output format (422), invalid params JSON (422), empty file (422), corrupt image (422), defaults to potrace, preset ignored
### 5. Preprocessing pipeline stages are individually testable
1. `cd engine && .venv/bin/python -m pytest tests/test_preprocessing.py -v`
2. **Expected:** 24 tests pass covering decode, grayscale, denoise, CLAHE, threshold, edge detect, morphological ops, and full pipeline composition
### 6. Potrace and VTracer produce different output from same input
1. `cd engine && .venv/bin/python -m pytest tests/test_vectorize.py::TestVtracerVsPotraceComparison -v`
2. **Expected:** 2 tests pass — both produce valid SVG from same input, outputs differ
### 7. API response shape matches contract
1. `cd engine && .venv/bin/python -c "
from fastapi.testclient import TestClient
from main import app
import numpy as np, cv2, json
client = TestClient(app)
img = np.zeros((100,100,3), dtype=np.uint8)
cv2.rectangle(img, (20,20), (80,80), (255,255,255), -1)
_, buf = cv2.imencode('.png', img)
r = client.post('/engine/trace', files={'file': ('test.png', buf.tobytes(), 'image/png')})
data = r.json()
assert r.status_code == 200
assert set(data.keys()) == {'output', 'format', 'metadata'}
assert data['format'] == 'svg'
assert set(data['metadata'].keys()) == {'path_count', 'node_count_total', 'open_paths', 'warnings', 'processing_ms'}
assert isinstance(data['metadata']['processing_ms'], float)
assert '<svg' in data['output']
print('PASS: response shape matches contract')
"
2. **Expected:** Prints "PASS: response shape matches contract", exit code 0
## Edge Cases
### Empty file upload
1. `cd engine && .venv/bin/python -m pytest tests/test_api.py::TestTraceEndpointValidation::test_empty_file -v`
2. **Expected:** Returns 422 with "Uploaded file is empty"
### Corrupt image bytes
1. `cd engine && .venv/bin/python -m pytest tests/test_api.py::TestTraceEndpointValidation::test_corrupt_image -v`
2. **Expected:** Returns 422 with "Preprocessing failed" message
### All-black and all-white images
1. `cd engine && .venv/bin/python -m pytest tests/test_vectorize.py -v -k "all_black or all_white"`
2. **Expected:** 4 tests pass — both Potrace and VTracer handle degenerate inputs without crashing
### Potrace rejects 3D input
1. `cd engine && .venv/bin/python -m pytest tests/test_vectorize.py::TestPotraceEdgeCases::test_rejects_3d_input -v`
2. **Expected:** Passes — potrace_trace raises ValueError for non-2D input
## Failure Signals
- Any test failure in the 76-test suite indicates a regression
- `import potrace` failing means system C libraries are missing (see L001 in KNOWLEDGE.md)
- `import vtracer` failing means the vtracer pip package wasn't installed
- API returning 500 on valid PNG input indicates a pipeline wiring issue
- Metadata with all zeros (path_count=0) on a non-trivial image suggests SVG format changed
## Not Proven By This UAT
- Output visual quality — no human evaluation of SVG fidelity vs source image
- Performance under load — no concurrency or stress testing
- DXF/JSON output formats — deferred to S02
- Preset system behavior — deferred to S03
- Docker containerized deployment — deferred to S03
- Real-world image quality (photos, logos with gradients, low-res scans)
## Notes for Tester
- All tests must be run with `.venv/bin/python -m pytest`, not bare `python` — there is no system-wide Python on PATH.
- Working directory must be `engine/` for imports to resolve correctly.
- Test images are generated programmatically (numpy arrays), no external fixture files needed.
- Processing times in metadata will vary by machine but should be under 100ms for test images.

View file

@ -0,0 +1,24 @@
{
"schemaVersion": 1,
"taskId": "T05",
"unitId": "M001/S01/T05",
"timestamp": 1774498959785,
"passed": false,
"discoverySource": "task-plan",
"checks": [
{
"command": "cd engine",
"exitCode": 0,
"durationMs": 5,
"verdict": "pass"
},
{
"command": "python -m pytest tests/test_api.py -v",
"exitCode": 127,
"durationMs": 4,
"verdict": "fail"
}
],
"retryAttempt": 1,
"maxRetries": 2
}

View file

@ -4,7 +4,7 @@
**Demo:** After this: /engine/trace returns valid DXF and JSON output; /engine/simplify reduces node count on complex SVG **Demo:** After this: /engine/trace returns valid DXF and JSON output; /engine/simplify reduces node count on complex SVG
## Tasks ## Tasks
- [ ] **T01: Post-processing: RDP simplification, island detection, open path repair** — 1. Create engine/pipeline/postprocess.py - [x] **T01: Post-processing: RDP simplification, island detection, open path repair** — 1. Create engine/pipeline/postprocess.py
2. Implement RDP path simplification: 2. Implement RDP path simplification:
- Parse SVG path data into coordinate arrays - Parse SVG path data into coordinate arrays
- Apply Ramer-Douglas-Peucker algorithm with tunable epsilon - Apply Ramer-Douglas-Peucker algorithm with tunable epsilon

View file

@ -0,0 +1,67 @@
---
id: T01
parent: S02
milestone: M001
provides:
- postprocess module with RDP simplification, island detection, open path repair, node counting
- postprocess_svg pipeline function for downstream endpoint wiring
key_files:
- engine/pipeline/postprocess.py
- engine/tests/test_postprocess.py
key_decisions:
- SVG path parsing closes subpaths on Z command by appending start point to coordinate list
- RDP implemented from scratch (no external dependency) — pure Python recursive algorithm
- Island detection uses signed area / winding direction (negative = clockwise = hole)
patterns_established:
- PostProcessResult dataclass as standard pipeline output shape for post-processing stage
observability_surfaces:
- none
duration: 15min
verification_result: passed
completed_at: 2026-03-26
blocker_discovered: false
---
# T01: Post-processing: RDP simplification, island detection, open path repair
**Implemented RDP path simplification, island/hole detection via winding direction, open path detection with auto-close, and node counting — all with 47 passing unit and integration tests**
## What Happened
Created `engine/pipeline/postprocess.py` with the full post-processing pipeline:
1. **SVG path parser** — handles M/L/H/V/C/Q commands (absolute and relative) with proper Z-close semantics that appends start point back to coordinate list
2. **RDP simplification** — recursive Ramer-Douglas-Peucker with tunable epsilon, reduces node count while preserving shape fidelity
3. **Island detection** — uses signed area (shoelace formula) to detect winding direction; clockwise paths (negative area) are flagged as islands/holes
4. **Open path detection** — checks if start/end within tolerance distance; optional auto-close appends start point
5. **Node counting** — per-path and aggregate
6. **`postprocess_svg()`** — full pipeline function that parses SVG, runs all analysis, rebuilds simplified SVG
Created `engine/tests/test_postprocess.py` with 47 tests covering every function individually plus integration tests feeding real Potrace and VTracer output through the pipeline.
## Verification
All 47 tests pass: `cd engine && python -m pytest tests/test_postprocess.py -v`
## Verification Evidence
| # | Command | Exit Code | Verdict | Duration |
|---|---------|-----------|---------|----------|
| 1 | `cd engine && .venv/bin/python -m pytest tests/test_postprocess.py -v` | 0 | pass | 0.17s |
## Diagnostics
None — pure computation module, no runtime processes or external services.
## Deviations
- SVG path parser needed to explicitly close subpaths on Z command (append start point to coordinate list) — not in original plan but required for `is_closed` detection to work correctly on Z-terminated paths.
## Known Issues
None.
## Files Created/Modified
- `engine/pipeline/postprocess.py` — New post-processing module with RDP, island detection, open path repair, node counting
- `engine/tests/test_postprocess.py` — 47 unit and integration tests for all post-processing functions

View file

@ -0,0 +1,414 @@
"""Post-processing pipeline — RDP simplification, island detection, open path repair."""
from __future__ import annotations
import math
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
@dataclass
class PathInfo:
"""Metadata and geometry for a single SVG path after post-processing."""
original_coords: list[tuple[float, float]]
simplified_coords: list[tuple[float, float]]
is_closed: bool
is_island: bool
node_count: int
original_node_count: int
area: float # signed — negative means clockwise (island/hole)
@dataclass
class PostProcessResult:
"""Aggregated result of running post-processing on an SVG."""
paths: list[PathInfo]
svg: str
total_nodes: int
total_original_nodes: int
open_path_count: int
island_count: int
# ---------------------------------------------------------------------------
# SVG path parsing
# ---------------------------------------------------------------------------
_CMD_RE = re.compile(r"([MmLlHhVvCcSsQqTtAaZz])")
_NUM_RE = re.compile(r"[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
def parse_svg_path(d: str) -> list[list[tuple[float, float]]]:
"""Parse an SVG path `d` attribute into a list of subpaths.
Each subpath is a list of (x, y) coordinate tuples extracted from
M/L/C/Q/Z commands. Curves are sampled at their control points and
endpoints (not flattened), which is sufficient for RDP simplification
and geometric analysis.
Returns:
List of subpaths, each a list of (x, y) tuples.
"""
tokens = _CMD_RE.split(d)
subpaths: list[list[tuple[float, float]]] = []
current: list[tuple[float, float]] = []
cx, cy = 0.0, 0.0
subpath_start: tuple[float, float] | None = None
i = 0
while i < len(tokens):
token = tokens[i].strip()
if not token:
i += 1
continue
if token in ("Z", "z"):
# Close the subpath by appending start point if not already there
if current and subpath_start is not None:
if current[-1] != subpath_start:
current.append(subpath_start)
subpaths.append(current)
current = []
i += 1
continue
if len(token) == 1 and token.isalpha():
cmd = token
i += 1
if i < len(tokens):
nums = [float(n) for n in _NUM_RE.findall(tokens[i])]
else:
nums = []
i += 1
else:
i += 1
continue
if cmd == "M":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "m":
if current:
subpaths.append(current)
current = []
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
if j == 0:
subpath_start = (cx, cy)
j += 2
elif cmd == "L":
j = 0
while j + 1 < len(nums):
cx, cy = nums[j], nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "l":
j = 0
while j + 1 < len(nums):
cx += nums[j]
cy += nums[j + 1]
current.append((cx, cy))
j += 2
elif cmd == "H":
for n in nums:
cx = n
current.append((cx, cy))
elif cmd == "h":
for n in nums:
cx += n
current.append((cx, cy))
elif cmd == "V":
for n in nums:
cy = n
current.append((cx, cy))
elif cmd == "v":
for n in nums:
cy += n
current.append((cx, cy))
elif cmd == "C":
j = 0
while j + 5 < len(nums):
# c1, c2, endpoint — keep endpoint for simplification
cx, cy = nums[j + 4], nums[j + 5]
current.append((nums[j], nums[j + 1]))
current.append((nums[j + 2], nums[j + 3]))
current.append((cx, cy))
j += 6
elif cmd == "c":
j = 0
while j + 5 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
current.append((cx + nums[j + 2], cy + nums[j + 3]))
cx += nums[j + 4]
cy += nums[j + 5]
current.append((cx, cy))
j += 6
elif cmd == "Q":
j = 0
while j + 3 < len(nums):
current.append((nums[j], nums[j + 1]))
cx, cy = nums[j + 2], nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd == "q":
j = 0
while j + 3 < len(nums):
current.append((cx + nums[j], cy + nums[j + 1]))
cx += nums[j + 2]
cy += nums[j + 3]
current.append((cx, cy))
j += 4
elif cmd in ("S", "s", "T", "t", "A", "a"):
# Simplified handling — just track endpoint
if nums:
if cmd.isupper():
cx, cy = nums[-2], nums[-1]
else:
cx += nums[-2]
cy += nums[-1]
current.append((cx, cy))
if current:
subpaths.append(current)
return subpaths
# ---------------------------------------------------------------------------
# RDP simplification
# ---------------------------------------------------------------------------
def _perpendicular_distance(
point: tuple[float, float],
line_start: tuple[float, float],
line_end: tuple[float, float],
) -> float:
"""Perpendicular distance from a point to a line segment."""
dx = line_end[0] - line_start[0]
dy = line_end[1] - line_start[1]
length_sq = dx * dx + dy * dy
if length_sq == 0:
return math.hypot(point[0] - line_start[0], point[1] - line_start[1])
num = abs(dy * point[0] - dx * point[1] + line_end[0] * line_start[1] - line_end[1] * line_start[0])
return num / math.sqrt(length_sq)
def rdp_simplify(
coords: list[tuple[float, float]], epsilon: float = 1.0
) -> list[tuple[float, float]]:
"""Apply Ramer-Douglas-Peucker simplification to a coordinate list.
Args:
coords: List of (x, y) tuples.
epsilon: Distance threshold higher values produce simpler paths.
Returns:
Simplified list of (x, y) tuples.
"""
if len(coords) <= 2:
return list(coords)
# Find the point with the maximum distance from the line between first and last
max_dist = 0.0
max_idx = 0
for i in range(1, len(coords) - 1):
dist = _perpendicular_distance(coords[i], coords[0], coords[-1])
if dist > max_dist:
max_dist = dist
max_idx = i
if max_dist > epsilon:
left = rdp_simplify(coords[: max_idx + 1], epsilon)
right = rdp_simplify(coords[max_idx:], epsilon)
return left[:-1] + right
else:
return [coords[0], coords[-1]]
# ---------------------------------------------------------------------------
# Geometric analysis
# ---------------------------------------------------------------------------
def signed_area(coords: list[tuple[float, float]]) -> float:
"""Compute the signed area of a polygon using the shoelace formula.
Positive = counter-clockwise (outer contour in SVG convention).
Negative = clockwise (island / hole).
"""
n = len(coords)
if n < 3:
return 0.0
area = 0.0
for i in range(n):
j = (i + 1) % n
area += coords[i][0] * coords[j][1]
area -= coords[j][0] * coords[i][1]
return area / 2.0
def is_closed(coords: list[tuple[float, float]], tolerance: float = 1.0) -> bool:
"""Check if a path's start and end points are within tolerance distance."""
if len(coords) < 2:
return False
return math.hypot(
coords[-1][0] - coords[0][0], coords[-1][1] - coords[0][1]
) <= tolerance
def close_path(coords: list[tuple[float, float]]) -> list[tuple[float, float]]:
"""Close an open path by appending the start point."""
if not coords:
return coords
if coords[-1] != coords[0]:
return coords + [coords[0]]
return list(coords)
def detect_island(coords: list[tuple[float, float]]) -> bool:
"""Detect if a closed path is an island (hole) based on winding direction.
In SVG convention with fill-rule="evenodd", clockwise paths (negative
signed area) represent holes/islands inside counter-clockwise outer contours.
"""
return signed_area(coords) < 0
# ---------------------------------------------------------------------------
# Node counting
# ---------------------------------------------------------------------------
def node_count(coords: list[tuple[float, float]]) -> int:
"""Return the number of unique nodes in a path."""
return len(coords)
# ---------------------------------------------------------------------------
# Full post-processing pipeline
# ---------------------------------------------------------------------------
def postprocess_svg(
svg_str: str,
epsilon: float = 1.0,
close_tolerance: float = 1.0,
auto_close: bool = False,
) -> PostProcessResult:
"""Run the full post-processing pipeline on an SVG string.
1. Parse SVG path data
2. Apply RDP simplification with given epsilon
3. Detect islands (clockwise winding)
4. Detect and optionally repair open paths
5. Count nodes per path
Args:
svg_str: Input SVG string.
epsilon: RDP simplification tolerance.
close_tolerance: Distance threshold for considering a path closed.
auto_close: If True, append start point to open paths.
Returns:
PostProcessResult with per-path metadata and rebuilt SVG.
"""
root = ET.fromstring(svg_str)
ns = {"svg": "http://www.w3.org/2000/svg"}
path_infos: list[PathInfo] = []
for path_el in root.findall("svg:path", ns) or root.findall("path"):
d = path_el.get("d", "")
if not d.strip():
continue
subpaths = parse_svg_path(d)
for coords in subpaths:
if len(coords) < 2:
continue
original_count = node_count(coords)
simplified = rdp_simplify(coords, epsilon)
closed = is_closed(simplified, close_tolerance)
if auto_close and not closed:
simplified = close_path(simplified)
closed = True
island = detect_island(simplified) if closed and len(simplified) >= 3 else False
area = signed_area(simplified)
path_infos.append(
PathInfo(
original_coords=coords,
simplified_coords=simplified,
is_closed=closed,
is_island=island,
node_count=node_count(simplified),
original_node_count=original_count,
area=area,
)
)
# Rebuild SVG with simplified paths
rebuilt_svg = _rebuild_svg(root, path_infos)
total_nodes = sum(p.node_count for p in path_infos)
total_original = sum(p.original_node_count for p in path_infos)
open_count = sum(1 for p in path_infos if not p.is_closed)
island_count = sum(1 for p in path_infos if p.is_island)
return PostProcessResult(
paths=path_infos,
svg=rebuilt_svg,
total_nodes=total_nodes,
total_original_nodes=total_original,
open_path_count=open_count,
island_count=island_count,
)
def _rebuild_svg(root: ET.Element, path_infos: list[PathInfo]) -> str:
"""Rebuild SVG string from post-processed path data."""
width = root.get("width", "100")
height = root.get("height", "100")
viewbox = root.get("viewBox", f"0 0 {width} {height}")
path_parts = []
for info in path_infos:
if len(info.simplified_coords) < 2:
continue
d_parts = []
x0, y0 = info.simplified_coords[0]
d_parts.append(f"M {x0:.3f},{y0:.3f}")
for x, y in info.simplified_coords[1:]:
d_parts.append(f"L {x:.3f},{y:.3f}")
if info.is_closed:
d_parts.append("Z")
path_parts.append(" ".join(d_parts))
d = " ".join(path_parts)
return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" '
f'viewBox="{viewbox}">'
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
f"</svg>"
)

View file

@ -0,0 +1,375 @@
"""Tests for the post-processing pipeline (RDP, island detection, open path repair)."""
import math
import pytest
from pipeline.postprocess import (
PostProcessResult,
close_path,
detect_island,
is_closed,
node_count,
parse_svg_path,
postprocess_svg,
rdp_simplify,
signed_area,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_svg(d: str, width: int = 100, height: int = 100) -> str:
"""Build a minimal SVG string with the given path data."""
return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" '
f'viewBox="0 0 {width} {height}">'
f'<path d="{d}" fill="black" fill-rule="evenodd" stroke="none"/>'
f"</svg>"
)
# A simple closed square: 0,0 → 100,0 → 100,100 → 0,100 → close
SQUARE_D = "M 0,0 L 100,0 L 100,100 L 0,100 Z"
# A triangle
TRIANGLE_D = "M 50,0 L 100,100 L 0,100 Z"
# An open path (no Z, endpoints differ)
OPEN_D = "M 0,0 L 50,50 L 100,0"
# Clockwise square (island/hole) — opposite winding from SQUARE_D
CW_SQUARE_D = "M 0,0 L 0,100 L 100,100 L 100,0 Z"
# ---------------------------------------------------------------------------
# SVG path parsing
# ---------------------------------------------------------------------------
class TestParseSvgPath:
def test_simple_move_and_lines(self):
subpaths = parse_svg_path("M 0,0 L 10,0 L 10,10 Z")
assert len(subpaths) == 1
assert subpaths[0][0] == (0.0, 0.0)
assert subpaths[0][1] == (10.0, 0.0)
assert subpaths[0][2] == (10.0, 10.0)
def test_multiple_subpaths(self):
subpaths = parse_svg_path("M 0,0 L 10,10 Z M 20,20 L 30,30 Z")
assert len(subpaths) == 2
def test_cubic_bezier(self):
subpaths = parse_svg_path("M 0,0 C 10,20 30,40 50,60 Z")
assert len(subpaths) == 1
coords = subpaths[0]
assert len(coords) >= 2
# Endpoint (50, 60) should be present; last point is (0,0) from Z close
assert (50.0, 60.0) in coords
assert coords[-1] == (0.0, 0.0) # Z closes back to start
def test_relative_lineto(self):
subpaths = parse_svg_path("M 10,10 l 5,0 l 0,5 Z")
assert len(subpaths) == 1
assert subpaths[0][0] == (10.0, 10.0)
assert subpaths[0][1] == (15.0, 10.0)
assert subpaths[0][2] == (15.0, 15.0)
def test_horizontal_vertical(self):
subpaths = parse_svg_path("M 0,0 H 10 V 10 Z")
assert len(subpaths) == 1
assert (10.0, 0.0) in subpaths[0]
assert (10.0, 10.0) in subpaths[0]
def test_empty_path(self):
subpaths = parse_svg_path("")
assert subpaths == []
def test_move_only(self):
subpaths = parse_svg_path("M 5,5")
assert len(subpaths) == 1
assert subpaths[0] == [(5.0, 5.0)]
def test_quadratic_bezier(self):
subpaths = parse_svg_path("M 0,0 Q 50,100 100,0 Z")
assert len(subpaths) == 1
coords = subpaths[0]
assert (100.0, 0.0) in coords
assert coords[-1] == (0.0, 0.0) # Z closes back to start
# ---------------------------------------------------------------------------
# RDP simplification
# ---------------------------------------------------------------------------
class TestRdpSimplify:
def test_collinear_points_reduced(self):
"""Points along a straight line should be reduced to just endpoints."""
coords = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
result = rdp_simplify(coords, epsilon=0.1)
assert len(result) == 2
assert result[0] == (0, 0)
assert result[-1] == (4, 4)
def test_preserves_corners(self):
"""A right angle should be preserved even with simplification."""
coords = [(0, 0), (10, 0), (10, 10)]
result = rdp_simplify(coords, epsilon=0.5)
assert len(result) == 3
def test_epsilon_zero_preserves_all(self):
"""Epsilon=0 should keep all points."""
coords = [(0, 0), (5, 1), (10, 0)]
result = rdp_simplify(coords, epsilon=0.0)
assert len(result) == 3
def test_high_epsilon_aggressive(self):
"""High epsilon should aggressively simplify."""
coords = [(0, 0), (5, 0.5), (10, 0), (15, 0.3), (20, 0)]
result = rdp_simplify(coords, epsilon=10.0)
assert len(result) == 2
def test_two_points_unchanged(self):
coords = [(0, 0), (10, 10)]
result = rdp_simplify(coords, epsilon=1.0)
assert result == [(0, 0), (10, 10)]
def test_single_point_unchanged(self):
coords = [(5, 5)]
result = rdp_simplify(coords, epsilon=1.0)
assert result == [(5, 5)]
def test_empty_input(self):
result = rdp_simplify([], epsilon=1.0)
assert result == []
def test_reduces_node_count(self):
"""A complex path should have fewer nodes after simplification."""
# Approximate a circle with many points
n = 100
coords = [
(50 + 40 * math.cos(2 * math.pi * i / n),
50 + 40 * math.sin(2 * math.pi * i / n))
for i in range(n)
]
result = rdp_simplify(coords, epsilon=2.0)
assert len(result) < len(coords)
assert len(result) >= 3 # must retain at least a polygon
# ---------------------------------------------------------------------------
# Signed area / winding detection
# ---------------------------------------------------------------------------
class TestSignedArea:
def test_ccw_square_positive(self):
"""Counter-clockwise square should have positive area."""
coords = [(0, 0), (100, 0), (100, 100), (0, 100)]
assert signed_area(coords) > 0
def test_cw_square_negative(self):
"""Clockwise square should have negative area."""
coords = [(0, 0), (0, 100), (100, 100), (100, 0)]
assert signed_area(coords) < 0
def test_area_magnitude(self):
"""Area of a 10x10 square should be 100."""
coords = [(0, 0), (10, 0), (10, 10), (0, 10)]
assert abs(signed_area(coords)) == pytest.approx(100.0)
def test_degenerate_line(self):
"""Two points have zero area."""
assert signed_area([(0, 0), (10, 10)]) == 0.0
def test_single_point(self):
assert signed_area([(0, 0)]) == 0.0
def test_empty(self):
assert signed_area([]) == 0.0
# ---------------------------------------------------------------------------
# Island detection
# ---------------------------------------------------------------------------
class TestDetectIsland:
def test_ccw_is_not_island(self):
coords = [(0, 0), (100, 0), (100, 100), (0, 100)]
assert detect_island(coords) is False
def test_cw_is_island(self):
coords = [(0, 0), (0, 100), (100, 100), (100, 0)]
assert detect_island(coords) is True
# ---------------------------------------------------------------------------
# Open path detection + repair
# ---------------------------------------------------------------------------
class TestIsClosed:
def test_closed_path(self):
coords = [(0, 0), (10, 0), (10, 10), (0, 0)]
assert is_closed(coords) is True
def test_open_path(self):
coords = [(0, 0), (10, 0), (10, 10)]
assert is_closed(coords) is False
def test_nearly_closed(self):
"""Path within tolerance should count as closed."""
coords = [(0, 0), (10, 0), (10, 10), (0.5, 0.3)]
assert is_closed(coords, tolerance=1.0) is True
def test_single_point(self):
assert is_closed([(0, 0)]) is False
def test_empty(self):
assert is_closed([]) is False
class TestClosePath:
def test_closes_open_path(self):
coords = [(0, 0), (10, 0), (10, 10)]
result = close_path(coords)
assert result[-1] == result[0]
assert len(result) == 4
def test_already_closed(self):
coords = [(0, 0), (10, 0), (10, 10), (0, 0)]
result = close_path(coords)
assert len(result) == 4 # no duplicate added
def test_empty(self):
assert close_path([]) == []
# ---------------------------------------------------------------------------
# Node counting
# ---------------------------------------------------------------------------
class TestNodeCount:
def test_counts_nodes(self):
assert node_count([(0, 0), (1, 1), (2, 2)]) == 3
def test_empty(self):
assert node_count([]) == 0
# ---------------------------------------------------------------------------
# Full pipeline integration
# ---------------------------------------------------------------------------
class TestPostprocessSvg:
def test_returns_result_object(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert isinstance(result, PostProcessResult)
def test_path_count(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert len(result.paths) >= 1
def test_node_count_reduction(self):
"""Simplification should reduce or maintain node count."""
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg, epsilon=0.5)
for path in result.paths:
assert path.node_count <= path.original_node_count
def test_total_nodes_tracked(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
assert result.total_nodes == sum(p.node_count for p in result.paths)
def test_closed_path_detected(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
# Square with Z should be detected as closed
assert any(p.is_closed for p in result.paths)
def test_open_path_detected(self):
svg = _make_svg(OPEN_D)
result = postprocess_svg(svg)
assert result.open_path_count >= 1
def test_auto_close(self):
svg = _make_svg(OPEN_D)
result = postprocess_svg(svg, auto_close=True)
# After auto-close, no open paths should remain
assert result.open_path_count == 0
def test_island_detection(self):
# Combine an outer CCW path with an inner CW path
combined_d = f"{SQUARE_D} {CW_SQUARE_D}"
svg = _make_svg(combined_d)
result = postprocess_svg(svg)
assert result.island_count >= 1
def test_output_svg_is_well_formed(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
import xml.etree.ElementTree as ET
root = ET.fromstring(result.svg)
assert root.tag == "{http://www.w3.org/2000/svg}svg"
def test_output_svg_has_path(self):
svg = _make_svg(SQUARE_D)
result = postprocess_svg(svg)
import xml.etree.ElementTree as ET
root = ET.fromstring(result.svg)
ns = {"svg": "http://www.w3.org/2000/svg"}
paths = root.findall("svg:path", ns)
assert len(paths) >= 1
def test_epsilon_affects_simplification(self):
"""Higher epsilon should produce fewer or equal nodes."""
# Build a complex path
n = 50
points = " ".join(
f"L {50 + 40 * math.cos(2 * math.pi * i / n):.3f},"
f"{50 + 40 * math.sin(2 * math.pi * i / n):.3f}"
for i in range(1, n)
)
x0 = 50 + 40 * math.cos(0)
y0 = 50 + 40 * math.sin(0)
d = f"M {x0:.3f},{y0:.3f} {points} Z"
svg = _make_svg(d)
result_low = postprocess_svg(svg, epsilon=0.1)
result_high = postprocess_svg(svg, epsilon=10.0)
assert result_high.total_nodes <= result_low.total_nodes
class TestPostprocessWithVectorizerOutput:
"""Integration test — feed real vectorizer SVG through post-processing."""
def test_potrace_output(self):
"""Post-process real Potrace output."""
import numpy as np
from pipeline.vectorize import potrace_trace
img = np.zeros((100, 100), dtype=np.uint8)
img[20:80, 20:80] = 255
svg = potrace_trace(img)
result = postprocess_svg(svg, epsilon=1.0)
assert isinstance(result, PostProcessResult)
assert len(result.paths) >= 1
assert result.total_nodes > 0
def test_vtracer_output(self):
"""Post-process real VTracer output."""
import numpy as np
from pipeline.vectorize import vtracer_trace
img = np.zeros((100, 100), dtype=np.uint8)
img[20:80, 20:80] = 255
svg = vtracer_trace(img)
result = postprocess_svg(svg, epsilon=1.0)
assert isinstance(result, PostProcessResult)
assert len(result.paths) >= 1
assert result.total_nodes > 0