Python asyncio patterns with TaskGroup, structured concurrency, and modern 3.11+ features. Use when implementing concurrent operations, async context managers, or high-performance async services.
View on GitHubyonatangross/orchestkit
ork
January 25, 2026
Select agents to install to:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/plugins/ork/skills/asyncio-advanced/SKILL.md -a claude-code --skill asyncio-advancedInstallation paths:
.claude/skills/asyncio-advanced/# Asyncio Advanced Patterns (2026)
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
## Overview
- Implementing concurrent HTTP requests or database queries
- Building async services with proper cancellation handling
- Managing multiple concurrent tasks with error propagation
- Rate limiting async operations with semaphores
- Bridging sync code to async contexts
## Quick Reference
### TaskGroup (Replaces gather)
```python
import asyncio
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user data concurrently - all tasks complete or all cancelled."""
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
preferences_task = tg.create_task(fetch_preferences(user_id))
# All tasks guaranteed complete here
return {
"user": user_task.result(),
"orders": orders_task.result(),
"preferences": preferences_task.result(),
}
```
### TaskGroup with Timeout
```python
async def fetch_with_timeout(urls: list[str], timeout_sec: float = 30) -> list[dict]:
"""Fetch all URLs with overall timeout - structured concurrency."""
results = []
async with asyncio.timeout(timeout_sec):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
```
### Semaphore for Concurrency Limiting
```python
class RateLimitedClient:
"""HTTP client with concurrency limiting."""
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._session: aiohttp.ClientSession | None = None
async def fetch(self, url: str) -> dict:
async with self._semaphore: # Limit concurrent requests
async with self._session.get(url) as response:
return await response.json()
async def fetch_many(se