Use when writing Pivot pipeline stages, seeing annotation errors (Dep, Out, Annotated), loader mismatches, "cannot pickle" errors, DirectoryOut validation failures, or IncrementalOut path mismatches
View on GitHubskills/writing-pivot-stages/SKILL.md
February 2, 2026
Select agents to install to:
npx add-skill https://github.com/sjawhar/pivot/blob/main/skills/writing-pivot-stages/SKILL.md -a claude-code --skill writing-pivot-stagesInstallation paths:
.claude/skills/writing-pivot-stages/# Writing Pivot Stages
## Overview
Pivot stages are pure Python functions declaring file I/O via type annotations. The framework handles loading, saving, caching, and DAG construction.
**Core principle:** Annotations handle all file I/O. Functions receive pre-loaded data and return data to be saved.
## Imports
```python
from typing import Annotated, TypedDict
from pivot.outputs import Dep, Out, Metric, Plot, PlaceholderDep, IncrementalOut, DirectoryOut
from pivot.loaders import CSV, JSON, JSONL, YAML, Text, Pickle, PathOnly, MatplotlibFigure
from pivot.loaders import Reader, Writer, Loader # Base classes for custom loaders
from pivot.stage_def import StageParams
from pivot.pipeline import Pipeline
```
## Stage Anatomy
```python
class MyParams(StageParams):
threshold: float = 0.5
class MyOutputs(TypedDict):
result: Annotated[pd.DataFrame, Out("output.csv", CSV())]
metrics: Annotated[dict, Metric("metrics.json")]
def my_stage(
params: MyParams,
data: Annotated[pd.DataFrame, Dep("input.csv", CSV())],
) -> MyOutputs:
filtered = data[data["score"] > params.threshold]
return {"result": filtered, "metrics": {"count": len(filtered)}}
pipeline = Pipeline("my_pipeline")
pipeline.register(my_stage, params=MyParams(threshold=0.3))
```
**Single output:** Annotate return directly instead of TypedDict:
```python
def transform(
data: Annotated[pd.DataFrame, Dep("input.csv", CSV())],
) -> Annotated[pd.DataFrame, Out("output.csv", CSV())]:
return data.dropna()
```
## Loader Hierarchy
The loader system has three base classes:
| Base Class | Methods | Use Case |
|------------|---------|----------|
| `Reader[R]` | `load() -> R` | Read-only (dependencies) |
| `Writer[W]` | `save(data: W, ...)` | Write-only (outputs) |
| `Loader[W, R]` | Both `load()` and `save()` | Bidirectional (incremental outputs) |
**Type constraints:**
- `Dep.loader` accepts `Reader[R]` (or `Loader`, which extends `Reader`)
- `Out.loader` accepts `Writer[W]` (or `