Incremental data loading patterns including backfill strategies, CDC, timestamp-based loads, and pipeline orchestration.
View on GitHubmajesticlabs-dev/majestic-marketplace
majestic-data
plugins/majestic-data/skills/etl-incremental-patterns/SKILL.md
January 24, 2026
Select agents to install to:
npx add-skill https://github.com/majesticlabs-dev/majestic-marketplace/blob/main/plugins/majestic-data/skills/etl-incremental-patterns/SKILL.md -a claude-code --skill etl-incremental-patternsInstallation paths:
.claude/skills/etl-incremental-patterns/# ETL Incremental Patterns
Patterns for incremental data loading and backfill operations.
## Backfill Strategy
```python
from datetime import date, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
def backfill_date_range(
start: date,
end: date,
process_fn: callable,
parallel: int = 4
) -> None:
"""Backfill data for a date range."""
dates = []
current = start
while current <= end:
dates.append(current)
current += timedelta(days=1)
# Process in parallel with controlled concurrency
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {executor.submit(process_fn, d): d for d in dates}
for future in as_completed(futures):
d = futures[future]
try:
future.result()
print(f"Completed: {d}")
except Exception as e:
print(f"Failed: {d} - {e}")
# Usage
backfill_date_range(
start=date(2024, 1, 1),
end=date(2024, 3, 31),
process_fn=process_daily_data,
parallel=4
)
```
## Incremental Load Patterns
### Timestamp-Based Incremental
```python
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame:
last_run = get_last_run_timestamp(table)
query = f"""
SELECT * FROM {table}
WHERE {timestamp_col} > :last_run
ORDER BY {timestamp_col}
"""
df = pd.read_sql(query, engine, params={'last_run': last_run})
if not df.empty:
set_last_run_timestamp(table, df[timestamp_col].max())
return df
```
### Change Data Capture (CDC)
```python
def process_cdc_events(events: list[dict]) -> None:
for event in events:
op = event['operation'] # INSERT, UPDATE, DELETE
data = event['data']
if op == 'DELETE':
soft_delete(data['id'])
else:
upsert(data)
```
### Full Refresh with Swap
```python
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None: