Advanced concurrency patterns for Tokio including fan-out/fan-in, pipeline processing, rate limiting, and coordinated shutdown. Use when building high-concurrency async systems.
View on GitHubgeoffjay/claude-plugins
rust-tokio-expert
January 20, 2026
Select agents to install to:
npx add-skill https://github.com/geoffjay/claude-plugins/blob/main/plugins/rust-tokio-expert/skills/tokio-concurrency/SKILL.md -a claude-code --skill tokio-concurrencyInstallation paths:
.claude/skills/tokio-concurrency/# Tokio Concurrency Patterns
This skill provides advanced concurrency patterns for building scalable async applications with Tokio.
## Fan-Out/Fan-In Pattern
Distribute work across multiple workers and collect results:
```rust
use futures::stream::{self, StreamExt};
pub async fn fan_out_fan_in<T, R>(
items: Vec<T>,
concurrency: usize,
process: impl Fn(T) -> Pin<Box<dyn Future<Output = R> + Send>> + Send + Sync + 'static,
) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
{
stream::iter(items)
.map(|item| process(item))
.buffer_unordered(concurrency)
.collect()
.await
}
// Usage
let results = fan_out_fan_in(
items,
10,
|item| Box::pin(async move { process_item(item).await })
).await;
```
## Pipeline Processing
Chain async processing stages:
```rust
use tokio::sync::mpsc;
pub struct Pipeline<T> {
stages: Vec<Box<dyn Stage<T>>>,
}
#[async_trait::async_trait]
pub trait Stage<T>: Send {
async fn process(&self, item: T) -> T;
}
impl<T: Send + 'static> Pipeline<T> {
pub fn new() -> Self {
Self { stages: Vec::new() }
}
pub fn add_stage<S: Stage<T> + 'static>(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}
pub async fn run(self, mut input: mpsc::Receiver<T>) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Some(mut item) = input.recv().await {
// Process through all stages
for stage in &self.stages {
item = stage.process(item).await;
}
if tx.send(item).await.is_err() {
break;
}
}
});
rx
}
}
// Usage
let pipeline = Pipeline::new()
.add_stage(ValidationStage)
.add_stage(TransformStage)
.add_stage(EnrichmentStage);
let output = pipeline.run(input_channel).await;
```
## Rate Limit