Back to Skills

etl-core-patterns

verified

Core ETL reliability patterns including idempotency, checkpointing, error handling, chunking, retry logic, and logging.

View on GitHub

Marketplace

majestic-marketplace

majesticlabs-dev/majestic-marketplace

Plugin

majestic-data

Repository

majesticlabs-dev/majestic-marketplace
19stars

plugins/majestic-data/skills/etl-core-patterns/SKILL.md

Last Verified

January 24, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/majesticlabs-dev/majestic-marketplace/blob/main/plugins/majestic-data/skills/etl-core-patterns/SKILL.md -a claude-code --skill etl-core-patterns

Installation paths:

Claude
.claude/skills/etl-core-patterns/
Powered by add-skill CLI

Instructions

# ETL Core Patterns

Reliability patterns for production data pipelines.

## Idempotency Patterns

```python
# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
    with engine.begin() as conn:
        conn.execute(
            text("DELETE FROM daily_metrics WHERE date = :date"),
            {"date": date}
        )
        df.to_sql('daily_metrics', conn, if_exists='append', index=False)

# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
    for batch in chunked(df.to_dict('records'), 1000):
        stmt = insert(MyTable).values(batch)
        stmt = stmt.on_conflict_do_update(
            index_elements=['id'],
            set_={col: stmt.excluded[col] for col in update_cols}
        )
        session.execute(stmt)

# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
    hash_cols = ['id', 'name', 'value', 'updated_at']
    df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
    return df
```

## Checkpointing

```python
import json
from pathlib import Path

class Checkpoint:
    def __init__(self, path: str):
        self.path = Path(path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {}

    def save(self) -> None:
        self.path.write_text(json.dumps(self.state, default=str))

    def get_last_processed(self, key: str) -> str | None:
        return self.state.get(key)

    def set_last_processed(self, key: str, value: str) -> None:
        self.state[key] = value
        self.save()

# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')

for batch in fetch_users_since(last_id):
    process(batch)
    checkpoint.set_last_processed('users_sync', batch[-1]['id'])
```

## Error Handling

```python
from dataclasses impo

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
4580 chars