Back to Skills

etl-incremental-patterns

verified

Incremental data loading patterns including backfill strategies, CDC, timestamp-based loads, and pipeline orchestration.

View on GitHub

Marketplace

majestic-marketplace

majesticlabs-dev/majestic-marketplace

Plugin

majestic-data

Repository

majesticlabs-dev/majestic-marketplace
19stars

plugins/majestic-data/skills/etl-incremental-patterns/SKILL.md

Last Verified

January 24, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/majesticlabs-dev/majestic-marketplace/blob/main/plugins/majestic-data/skills/etl-incremental-patterns/SKILL.md -a claude-code --skill etl-incremental-patterns

Installation paths:

Claude
.claude/skills/etl-incremental-patterns/
Powered by add-skill CLI

Instructions

# ETL Incremental Patterns

Patterns for incremental data loading and backfill operations.

## Backfill Strategy

```python
from datetime import date, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed

def backfill_date_range(
    start: date,
    end: date,
    process_fn: callable,
    parallel: int = 4
) -> None:
    """Backfill data for a date range."""
    dates = []
    current = start
    while current <= end:
        dates.append(current)
        current += timedelta(days=1)

    # Process in parallel with controlled concurrency
    with ThreadPoolExecutor(max_workers=parallel) as executor:
        futures = {executor.submit(process_fn, d): d for d in dates}
        for future in as_completed(futures):
            d = futures[future]
            try:
                future.result()
                print(f"Completed: {d}")
            except Exception as e:
                print(f"Failed: {d} - {e}")

# Usage
backfill_date_range(
    start=date(2024, 1, 1),
    end=date(2024, 3, 31),
    process_fn=process_daily_data,
    parallel=4
)
```

## Incremental Load Patterns

### Timestamp-Based Incremental

```python
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame:
    last_run = get_last_run_timestamp(table)
    query = f"""
        SELECT * FROM {table}
        WHERE {timestamp_col} > :last_run
        ORDER BY {timestamp_col}
    """
    df = pd.read_sql(query, engine, params={'last_run': last_run})
    if not df.empty:
        set_last_run_timestamp(table, df[timestamp_col].max())
    return df
```

### Change Data Capture (CDC)

```python
def process_cdc_events(events: list[dict]) -> None:
    for event in events:
        op = event['operation']  # INSERT, UPDATE, DELETE
        data = event['data']

        if op == 'DELETE':
            soft_delete(data['id'])
        else:
            upsert(data)
```

### Full Refresh with Swap

```python
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None:

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
3689 chars