Core ETL reliability patterns including idempotency, checkpointing, error handling, chunking, retry logic, and logging.
View on GitHubmajesticlabs-dev/majestic-marketplace
majestic-data
January 24, 2026
Select agents to install to:
npx add-skill https://github.com/majesticlabs-dev/majestic-marketplace/blob/main/plugins/majestic-data/skills/etl-core-patterns/SKILL.md -a claude-code --skill etl-core-patternsInstallation paths:
.claude/skills/etl-core-patterns/# ETL Core Patterns
Reliability patterns for production data pipelines.
## Idempotency Patterns
```python
# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(
text("DELETE FROM daily_metrics WHERE date = :date"),
{"date": date}
)
df.to_sql('daily_metrics', conn, if_exists='append', index=False)
# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
for batch in chunked(df.to_dict('records'), 1000):
stmt = insert(MyTable).values(batch)
stmt = stmt.on_conflict_do_update(
index_elements=['id'],
set_={col: stmt.excluded[col] for col in update_cols}
)
session.execute(stmt)
# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
hash_cols = ['id', 'name', 'value', 'updated_at']
df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
return df
```
## Checkpointing
```python
import json
from pathlib import Path
class Checkpoint:
def __init__(self, path: str):
self.path = Path(path)
self.state = self._load()
def _load(self) -> dict:
if self.path.exists():
return json.loads(self.path.read_text())
return {}
def save(self) -> None:
self.path.write_text(json.dumps(self.state, default=str))
def get_last_processed(self, key: str) -> str | None:
return self.state.get(key)
def set_last_processed(self, key: str, value: str) -> None:
self.state[key] = value
self.save()
# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')
for batch in fetch_users_since(last_id):
process(batch)
checkpoint.set_last_processed('users_sync', batch[-1]['id'])
```
## Error Handling
```python
from dataclasses impo