Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.
View on GitHubancoleman/ai-design-components
backend-ai-skills
February 1, 2026
Select agents to install to:
npx add-skill https://github.com/ancoleman/ai-design-components/blob/main/skills/ingesting-data/SKILL.md -a claude-code --skill ingesting-dataInstallation paths:
.claude/skills/ingesting-data/# Data Ingestion Patterns
This skill provides patterns for getting data INTO systems from external sources.
## When to Use This Skill
- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis
## Ingestion Pattern Decision Tree
```
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
```
## Quick Start by Language
### Python (Recommended for ETL)
**dlt (data load tool) - Modern Python ETL:**
```python
import dlt
# Define a source
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# Load to destination
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # or duckdb, bigquery, snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
```
**Polars for file processing (faster than pandas):**
```python
import polars as pl
# Read CSV with schema inference
df = pl.read_csv("data.csv")
# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")
# Read JSON lines
df = pl.read_ndjson("events.jsonl")
# Write to database
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
```
### TypeScript/Node.js
**S3 ingestion:**
```typescript
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { p