Back to Skills

message-queues

verified

Message queue patterns with RabbitMQ, Redis Streams, and Kafka. Use when implementing async communication, pub/sub systems, event-driven microservices, or reliable message delivery.

View on GitHub

Marketplace

orchestkit

yonatangross/skillforge-claude-plugin

Plugin

orchestkit-complete

development

Repository

yonatangross/skillforge-claude-plugin
33stars

./skills/message-queues/SKILL.md

Last Verified

January 23, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/skillforge-claude-plugin/blob/main/./skills/message-queues/SKILL.md -a claude-code --skill message-queues

Installation paths:

Claude
.claude/skills/message-queues/
Powered by add-skill CLI

Instructions

# Message Queue Patterns (2026)

Asynchronous communication patterns for distributed systems using RabbitMQ, Redis Streams, Kafka, and FastStream.

## Overview

- Decoupling services in microservices architecture
- Implementing pub/sub and work queue patterns
- Building event-driven systems with reliable delivery
- Load leveling and buffering between services
- Task distribution across multiple workers
- High-throughput event streaming (Kafka)

## Quick Reference

### FastStream: Unified API (2026 Recommended)

```python
# pip install faststream[kafka,rabbit,redis]
from faststream import FastStream
from faststream.kafka import KafkaBroker
from pydantic import BaseModel

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

class OrderCreated(BaseModel):
    order_id: str
    customer_id: str
    total: float

@broker.subscriber("orders.created")
async def handle_order(event: OrderCreated):
    """Automatic Pydantic validation and deserialization."""
    print(f"Processing order {event.order_id}")
    await process_order(event)

@broker.publisher("orders.processed")
async def publish_processed(order_id: str) -> dict:
    return {"order_id": order_id, "status": "processed"}

# Run with: faststream run app:app
```

### Kafka Producer (aiokafka)

```python
from aiokafka import AIOKafkaProducer
import json

class KafkaPublisher:
    def __init__(self, bootstrap_servers: str):
        self.bootstrap_servers = bootstrap_servers
        self._producer: AIOKafkaProducer | None = None

    async def start(self):
        self._producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode(),
            acks="all",  # Wait for all replicas
            enable_idempotence=True,  # Exactly-once semantics
        )
        await self._producer.start()

    async def publish(
        self,
        topic: str,
        value: dict,
        key: str | None = None,
    ):
        await self._prod

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
9110 chars