Back to list
christian289

implementing-io-pipelines

by christian289

ClaudeCode와 함께하는 .NET 개발 튜토리얼

1🍴 0📅 Jan 25, 2026

SKILL.md


name: implementing-io-pipelines description: "Implements high-performance streaming using System.IO.Pipelines in .NET. Use when building network protocols, parsing binary data, or processing large streams efficiently."

.NET Streaming (System.IO.Pipelines)

A guide for System.IO.Pipelines API for high-performance I/O pipelines.

Quick Reference: See QUICKREF.md for essential patterns at a glance.

1. Core Concepts

ConceptDescription
PipeMemory buffer-based read/write pipe
PipeReaderRead data from pipe
PipeWriterWrite data to pipe
ReadOnlySequence<T>Non-contiguous memory sequence

2. Advantages

  • Zero-copy: Minimizes unnecessary memory copying
  • Backpressure control: Speed regulation between producer and consumer
  • Memory pooling: Automatic buffer reuse
  • Async I/O: Efficient asynchronous processing

3. Basic Usage

using System.IO.Pipelines;

public sealed class PipelineProcessor
{
    public async Task ProcessAsync(Stream stream)
    {
        var pipe = new Pipe();

        // Run Writer and Reader concurrently
        var writing = FillPipeAsync(stream, pipe.Writer);
        var reading = ReadPipeAsync(pipe.Reader);

        await Task.WhenAll(writing, reading);
    }

    private async Task FillPipeAsync(Stream stream, PipeWriter writer)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            // Acquire buffer from memory pool
            Memory<byte> memory = writer.GetMemory(minimumBufferSize);

            int bytesRead = await stream.ReadAsync(memory);

            if (bytesRead == 0)
                break;

            // Notify bytes written
            writer.Advance(bytesRead);

            // Flush data and notify Reader
            FlushResult result = await writer.FlushAsync();

            if (result.IsCompleted)
                break;
        }

        // Signal write completion
        await writer.CompleteAsync();
    }

    private async Task ReadPipeAsync(PipeReader reader)
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            // Process buffer
            ProcessBuffer(buffer);

            // Notify consumption up to processed position
            reader.AdvanceTo(buffer.End);

            if (result.IsCompleted)
                break;
        }

        // Signal read completion
        await reader.CompleteAsync();
    }
}

4. Line-by-Line Parsing

private async Task ReadLinesAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            ProcessLine(line);
        }

        // Notify unprocessed data position
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
            break;
    }

    await reader.CompleteAsync();
}

private bool TryReadLine(
    ref ReadOnlySequence<byte> buffer,
    out ReadOnlySequence<byte> line)
{
    // Find newline
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position is null)
    {
        line = default;
        return false;
    }

    // Slice up to newline
    line = buffer.Slice(0, position.Value);

    // Move buffer past newline
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));

    return true;
}

5. Processing ReadOnlySequence

private void ProcessBuffer(ReadOnlySequence<byte> buffer)
{
    if (buffer.IsSingleSegment)
    {
        // Single segment - direct access
        ProcessSpan(buffer.FirstSpan);
    }
    else
    {
        // Multiple segments - iteration required
        foreach (var segment in buffer)
        {
            ProcessSpan(segment.Span);
        }
    }
}

6. Network I/O Integration

public async Task ProcessSocketAsync(Socket socket)
{
    var pipe = new Pipe();

    var writing = ReceiveAsync(socket, pipe.Writer);
    var reading = ProcessAsync(pipe.Reader);

    await Task.WhenAll(writing, reading);
}

private async Task ReceiveAsync(Socket socket, PipeWriter writer)
{
    while (true)
    {
        Memory<byte> memory = writer.GetMemory(4096);

        int bytesReceived = await socket.ReceiveAsync(
            memory,
            SocketFlags.None);

        if (bytesReceived == 0)
            break;

        writer.Advance(bytesReceived);

        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
            break;
    }

    await writer.CompleteAsync();
}

7. PipeOptions Configuration

var pipeOptions = new PipeOptions(
    pool: MemoryPool<byte>.Shared,           // Memory pool
    readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler
    writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler
    pauseWriterThreshold: 64 * 1024,         // Writer pause threshold
    resumeWriterThreshold: 32 * 1024,        // Writer resume threshold
    minimumSegmentSize: 4096,                // Minimum segment size
    useSynchronizationContext: false
);

var pipe = new Pipe(pipeOptions);

8. Required NuGet Package

<ItemGroup>
  <!-- Included in BCL for .NET Core 3.0+ -->
  <PackageReference Include="System.IO.Pipelines" Version="9.0.*" />
</ItemGroup>

9. Important Notes

AdvanceTo Call Required

// Must call AdvanceTo after ReadAsync
ReadResult result = await reader.ReadAsync();
// ... processing ...
reader.AdvanceTo(consumed, examined);

Buffer Lifetime

// ❌ Bad example: Saving buffer after ReadAsync
ReadOnlySequence<byte> saved;
var result = await reader.ReadAsync();
saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo

// ✅ Good example: Copy needed data
var copy = result.Buffer.ToArray();
reader.AdvanceTo(result.Buffer.End);

Completion Calls

// Must call CompleteAsync for both Writer and Reader
await writer.CompleteAsync();
await reader.CompleteAsync();

10. References

Score

Total Score

65/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon