Back to Skills

tokio-concurrency

verified

Advanced concurrency patterns for Tokio including fan-out/fan-in, pipeline processing, rate limiting, and coordinated shutdown. Use when building high-concurrency async systems.

View on GitHub

Marketplace

geoffjay-claude-plugins

geoffjay/claude-plugins

Plugin

rust-tokio-expert

languages

Repository

geoffjay/claude-plugins
7stars

plugins/rust-tokio-expert/skills/tokio-concurrency/SKILL.md

Last Verified

January 20, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/geoffjay/claude-plugins/blob/main/plugins/rust-tokio-expert/skills/tokio-concurrency/SKILL.md -a claude-code --skill tokio-concurrency

Installation paths:

Claude
.claude/skills/tokio-concurrency/
Powered by add-skill CLI

Instructions

# Tokio Concurrency Patterns

This skill provides advanced concurrency patterns for building scalable async applications with Tokio.

## Fan-Out/Fan-In Pattern

Distribute work across multiple workers and collect results:

```rust
use futures::stream::{self, StreamExt};

pub async fn fan_out_fan_in<T, R>(
    items: Vec<T>,
    concurrency: usize,
    process: impl Fn(T) -> Pin<Box<dyn Future<Output = R> + Send>> + Send + Sync + 'static,
) -> Vec<R>
where
    T: Send + 'static,
    R: Send + 'static,
{
    stream::iter(items)
        .map(|item| process(item))
        .buffer_unordered(concurrency)
        .collect()
        .await
}

// Usage
let results = fan_out_fan_in(
    items,
    10,
    |item| Box::pin(async move { process_item(item).await })
).await;
```

## Pipeline Processing

Chain async processing stages:

```rust
use tokio::sync::mpsc;

pub struct Pipeline<T> {
    stages: Vec<Box<dyn Stage<T>>>,
}

#[async_trait::async_trait]
pub trait Stage<T>: Send {
    async fn process(&self, item: T) -> T;
}

impl<T: Send + 'static> Pipeline<T> {
    pub fn new() -> Self {
        Self { stages: Vec::new() }
    }

    pub fn add_stage<S: Stage<T> + 'static>(mut self, stage: S) -> Self {
        self.stages.push(Box::new(stage));
        self
    }

    pub async fn run(self, mut input: mpsc::Receiver<T>) -> mpsc::Receiver<T> {
        let (tx, rx) = mpsc::channel(100);

        tokio::spawn(async move {
            while let Some(mut item) = input.recv().await {
                // Process through all stages
                for stage in &self.stages {
                    item = stage.process(item).await;
                }

                if tx.send(item).await.is_err() {
                    break;
                }
            }
        });

        rx
    }
}

// Usage
let pipeline = Pipeline::new()
    .add_stage(ValidationStage)
    .add_stage(TransformStage)
    .add_stage(EnrichmentStage);

let output = pipeline.run(input_channel).await;
```

## Rate Limit

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
9939 chars