Implements Pub-Sub patterns using System.Reactive and Channels for event-based communication in .NET. Use when building reactive applications or decoupled event-driven architectures.
View on GitHubchristian289/dotnet-with-claudecode
wpf-dev-pack
January 23, 2026
Select agents to install to:
npx add-skill https://github.com/christian289/dotnet-with-claudecode/blob/main/wpf-dev-pack/skills/implementing-pubsub-pattern/SKILL.md -a claude-code --skill implementing-pubsub-patternInstallation paths:
.claude/skills/implementing-pubsub-pattern/# .NET Pub-Sub Pattern
A guide for Pub-Sub patterns for event-based asynchronous communication.
**Quick Reference:** See [QUICKREF.md](QUICKREF.md) for essential patterns at a glance.
## 1. Core APIs
| API | Purpose | NuGet |
|-----|---------|-------|
| `System.Reactive` (Rx.NET) | Reactive event streams | System.Reactive |
| `System.Threading.Channels` | Async Producer-Consumer | BCL |
| `IObservable<T>` | Observable sequence | BCL |
---
## 2. System.Threading.Channels
### 2.1 Basic Usage
```csharp
using System.Threading.Channels;
public sealed class MessageProcessor
{
private readonly Channel<Message> _channel =
Channel.CreateUnbounded<Message>();
// Producer - Send message
public async Task SendAsync(Message message)
{
await _channel.Writer.WriteAsync(message);
}
// Consumer - Process message
public async Task ProcessAsync(CancellationToken ct)
{
await foreach (var message in _channel.Reader.ReadAllAsync(ct))
{
await HandleMessage(message);
}
}
// Channel completion signal
public void Complete() => _channel.Writer.Complete();
}
```
### 2.2 Bounded Channel (Backpressure Control)
```csharp
// Backpressure control with buffer size limit
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait, // Wait when full
SingleReader = true,
SingleWriter = false
};
var channel = Channel.CreateBounded<Message>(options);
// Writer waits until space is available
await channel.Writer.WriteAsync(message);
```
### 2.3 Multiple Consumer Pattern
```csharp
public sealed class WorkerPool
{
private readonly Channel<WorkItem> _channel;
private readonly int _workerCount;
public WorkerPool(int workerCount = 4)
{
_workerCount = workerCount;
_channel = Channel.CreateUnbounded<WorkItem>();
}
public async Task StartAsync(CancellationToken ct)
{
var workers = Enumerable.Range(0,