Use when FastAPI async patterns for building high-performance APIs. Use when handling concurrent requests and async operations.
View on GitHubTheBushidoCollective/han
jutsu-fastapi
January 24, 2026
Select agents to install to:
npx add-skill https://github.com/TheBushidoCollective/han/blob/main/jutsu/jutsu-fastapi/skills/fastapi-async-patterns/SKILL.md -a claude-code --skill fastapi-async-patternsInstallation paths:
.claude/skills/fastapi-async-patterns/# FastAPI Async Patterns
Master async patterns in FastAPI for building high-performance,
concurrent APIs with optimal resource usage.
## Basic Async Route Handlers
Understanding async vs sync endpoints in FastAPI.
```python
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
# Sync endpoint (blocks the event loop)
@app.get('/sync')
def sync_endpoint():
time.sleep(1) # Blocks the entire server
return {'message': 'Completed after 1 second'}
# Async endpoint (non-blocking)
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(1) # Other requests can be handled
return {'message': 'Completed after 1 second'}
# CPU-bound work (use sync)
@app.get('/cpu-intensive')
def cpu_intensive():
result = sum(i * i for i in range(10000000))
return {'result': result}
# I/O-bound work (use async)
@app.get('/io-intensive')
async def io_intensive():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
```
## Async Database Operations
Async database patterns with popular ORMs and libraries.
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# SQLAlchemy async setup
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db'
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@