Message queue patterns with RabbitMQ, Redis Streams, and Kafka. Use when implementing async communication, pub/sub systems, event-driven microservices, or reliable message delivery.
View on GitHubyonatangross/skillforge-claude-plugin
orchestkit-complete
January 23, 2026
Select agents to install to:
npx add-skill https://github.com/yonatangross/skillforge-claude-plugin/blob/main/./skills/message-queues/SKILL.md -a claude-code --skill message-queuesInstallation paths:
.claude/skills/message-queues/# Message Queue Patterns (2026)
Asynchronous communication patterns for distributed systems using RabbitMQ, Redis Streams, Kafka, and FastStream.
## Overview
- Decoupling services in microservices architecture
- Implementing pub/sub and work queue patterns
- Building event-driven systems with reliable delivery
- Load leveling and buffering between services
- Task distribution across multiple workers
- High-throughput event streaming (Kafka)
## Quick Reference
### FastStream: Unified API (2026 Recommended)
```python
# pip install faststream[kafka,rabbit,redis]
from faststream import FastStream
from faststream.kafka import KafkaBroker
from pydantic import BaseModel
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
class OrderCreated(BaseModel):
order_id: str
customer_id: str
total: float
@broker.subscriber("orders.created")
async def handle_order(event: OrderCreated):
"""Automatic Pydantic validation and deserialization."""
print(f"Processing order {event.order_id}")
await process_order(event)
@broker.publisher("orders.processed")
async def publish_processed(order_id: str) -> dict:
return {"order_id": order_id, "status": "processed"}
# Run with: faststream run app:app
```
### Kafka Producer (aiokafka)
```python
from aiokafka import AIOKafkaProducer
import json
class KafkaPublisher:
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: AIOKafkaProducer | None = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
acks="all", # Wait for all replicas
enable_idempotence=True, # Exactly-once semantics
)
await self._producer.start()
async def publish(
self,
topic: str,
value: dict,
key: str | None = None,
):
await self._prod