Back to list
christian289

implementing-pubsub-pattern

by christian289

ClaudeCode와 함께하는 .NET 개발 튜토리얼

1🍴 0📅 Jan 25, 2026

SKILL.md


name: implementing-pubsub-pattern description: "Implements Pub-Sub patterns using System.Reactive and Channels for event-based communication in .NET. Use when building reactive applications or decoupled event-driven architectures."

.NET Pub-Sub Pattern

A guide for Pub-Sub patterns for event-based asynchronous communication.

Quick Reference: See QUICKREF.md for essential patterns at a glance.

1. Core APIs

APIPurposeNuGet
System.Reactive (Rx.NET)Reactive event streamsSystem.Reactive
System.Threading.ChannelsAsync Producer-ConsumerBCL
IObservable<T>Observable sequenceBCL

2. System.Threading.Channels

2.1 Basic Usage

using System.Threading.Channels;

public sealed class MessageProcessor
{
    private readonly Channel<Message> _channel =
        Channel.CreateUnbounded<Message>();

    // Producer - Send message
    public async Task SendAsync(Message message)
    {
        await _channel.Writer.WriteAsync(message);
    }

    // Consumer - Process message
    public async Task ProcessAsync(CancellationToken ct)
    {
        await foreach (var message in _channel.Reader.ReadAllAsync(ct))
        {
            await HandleMessage(message);
        }
    }

    // Channel completion signal
    public void Complete() => _channel.Writer.Complete();
}

2.2 Bounded Channel (Backpressure Control)

// Backpressure control with buffer size limit
var options = new BoundedChannelOptions(capacity: 100)
{
    FullMode = BoundedChannelFullMode.Wait, // Wait when full
    SingleReader = true,
    SingleWriter = false
};

var channel = Channel.CreateBounded<Message>(options);

// Writer waits until space is available
await channel.Writer.WriteAsync(message);

2.3 Multiple Consumer Pattern

public sealed class WorkerPool
{
    private readonly Channel<WorkItem> _channel;
    private readonly int _workerCount;

    public WorkerPool(int workerCount = 4)
    {
        _workerCount = workerCount;
        _channel = Channel.CreateUnbounded<WorkItem>();
    }

    public async Task StartAsync(CancellationToken ct)
    {
        var workers = Enumerable.Range(0, _workerCount)
            .Select(_ => ProcessAsync(ct));

        await Task.WhenAll(workers);
    }

    private async Task ProcessAsync(CancellationToken ct)
    {
        await foreach (var item in _channel.Reader.ReadAllAsync(ct))
        {
            await ProcessItem(item);
        }
    }

    public ValueTask EnqueueAsync(WorkItem item) =>
        _channel.Writer.WriteAsync(item);
}

3. System.Reactive (Rx.NET)

3.1 EventAggregator Pattern

using System.Reactive.Linq;
using System.Reactive.Subjects;

public sealed class EventAggregator : IDisposable
{
    private readonly Subject<object> _subject = new();

    // Subscribe to specific event type
    public IObservable<T> GetEvent<T>() =>
        _subject.OfType<T>().AsObservable();

    // Publish event
    public void Publish<T>(T @event) =>
        _subject.OnNext(@event!);

    public void Dispose() => _subject.Dispose();
}

3.2 Usage Example

// Event definitions
public record UserLoggedIn(string UserId);
public record OrderPlaced(int OrderId);

// Subscription
var aggregator = new EventAggregator();

aggregator.GetEvent<UserLoggedIn>()
    .Subscribe(e => Console.WriteLine($"User logged in: {e.UserId}"));

aggregator.GetEvent<OrderPlaced>()
    .Where(e => e.OrderId > 100)
    .Subscribe(e => Console.WriteLine($"Large order: {e.OrderId}"));

// Publish
aggregator.Publish(new UserLoggedIn("user123"));
aggregator.Publish(new OrderPlaced(150));

3.3 Rx Operators

// Debounce - Process only the last event in a sequence
searchInput
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .Subscribe(query => Search(query));

// Buffer - Collect events for a period and process as batch
events
    .Buffer(TimeSpan.FromSeconds(5))
    .Subscribe(batch => ProcessBatch(batch));

// Retry - Retry on failure
observable
    .Retry(3)
    .Subscribe(
        onNext: data => Process(data),
        onError: ex => LogError(ex)
    );

4. Comparison: Channels vs Rx

FeatureChannelsRx.NET
PurposeProducer-ConsumerEvent streams
BackpressureBuilt-in (Bounded)Separate implementation
OperatorsBasicRich
Learning curveLowHigh
DependencyBCLNuGet

5. DI Integration

// Program.cs
services.AddSingleton(Channel.CreateUnbounded<Message>());
services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Reader);
services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Writer);

// Producer
public sealed class Producer(ChannelWriter<Message> writer)
{
    public ValueTask SendAsync(Message msg) => writer.WriteAsync(msg);
}

// Consumer
public sealed class Consumer(ChannelReader<Message> reader)
{
    public async Task ProcessAsync(CancellationToken ct)
    {
        await foreach (var msg in reader.ReadAllAsync(ct))
        {
            await Handle(msg);
        }
    }
}

6. Required NuGet Package

<ItemGroup>
  <PackageReference Include="System.Reactive" Version="6.0.*" />
</ItemGroup>

7. Important Notes

Memory Leaks

// Subscription disposal is required
var subscription = observable.Subscribe(handler);

// After use
subscription.Dispose();

Thread Safety

  • Channels are thread-safe by default
  • Subject is not thread-safe (use Synchronize() if needed)

Backpressure Handling

// Prevent memory explosion with Bounded Channel
var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.DropOldest // Drop old messages
});

8. References

Score

Total Score

65/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon