← Back to list

implementing-io-pipelines
by christian289
ClaudeCode와 함께하는 .NET 개발 튜토리얼
⭐ 1🍴 0📅 Jan 25, 2026
SKILL.md
name: implementing-io-pipelines description: "Implements high-performance streaming using System.IO.Pipelines in .NET. Use when building network protocols, parsing binary data, or processing large streams efficiently."
.NET Streaming (System.IO.Pipelines)
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
1. Core Concepts
| Concept | Description |
|---|---|
Pipe | Memory buffer-based read/write pipe |
PipeReader | Read data from pipe |
PipeWriter | Write data to pipe |
ReadOnlySequence<T> | Non-contiguous memory sequence |
2. Advantages
- Zero-copy: Minimizes unnecessary memory copying
- Backpressure control: Speed regulation between producer and consumer
- Memory pooling: Automatic buffer reuse
- Async I/O: Efficient asynchronous processing
3. Basic Usage
using System.IO.Pipelines;
public sealed class PipelineProcessor
{
public async Task ProcessAsync(Stream stream)
{
var pipe = new Pipe();
// Run Writer and Reader concurrently
var writing = FillPipeAsync(stream, pipe.Writer);
var reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Acquire buffer from memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
// Notify bytes written
writer.Advance(bytesRead);
// Flush data and notify Reader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
// Signal write completion
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// Process buffer
ProcessBuffer(buffer);
// Notify consumption up to processed position
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
// Signal read completion
await reader.CompleteAsync();
}
}
4. Line-by-Line Parsing
private async Task ReadLinesAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
ProcessLine(line);
}
// Notify unprocessed data position
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
private bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// Find newline
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to newline
line = buffer.Slice(0, position.Value);
// Move buffer past newline
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
5. Processing ReadOnlySequence
private void ProcessBuffer(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
// Single segment - direct access
ProcessSpan(buffer.FirstSpan);
}
else
{
// Multiple segments - iteration required
foreach (var segment in buffer)
{
ProcessSpan(segment.Span);
}
}
}
6. Network I/O Integration
public async Task ProcessSocketAsync(Socket socket)
{
var pipe = new Pipe();
var writing = ReceiveAsync(socket, pipe.Writer);
var reading = ProcessAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task ReceiveAsync(Socket socket, PipeWriter writer)
{
while (true)
{
Memory<byte> memory = writer.GetMemory(4096);
int bytesReceived = await socket.ReceiveAsync(
memory,
SocketFlags.None);
if (bytesReceived == 0)
break;
writer.Advance(bytesReceived);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
7. PipeOptions Configuration
var pipeOptions = new PipeOptions(
pool: MemoryPool<byte>.Shared, // Memory pool
readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler
writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler
pauseWriterThreshold: 64 * 1024, // Writer pause threshold
resumeWriterThreshold: 32 * 1024, // Writer resume threshold
minimumSegmentSize: 4096, // Minimum segment size
useSynchronizationContext: false
);
var pipe = new Pipe(pipeOptions);
8. Required NuGet Package
<ItemGroup>
<!-- Included in BCL for .NET Core 3.0+ -->
<PackageReference Include="System.IO.Pipelines" Version="9.0.*" />
</ItemGroup>
9. Important Notes
AdvanceTo Call Required
// Must call AdvanceTo after ReadAsync
ReadResult result = await reader.ReadAsync();
// ... processing ...
reader.AdvanceTo(consumed, examined);
Buffer Lifetime
// ❌ Bad example: Saving buffer after ReadAsync
ReadOnlySequence<byte> saved;
var result = await reader.ReadAsync();
saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo
// ✅ Good example: Copy needed data
var copy = result.Buffer.ToArray();
reader.AdvanceTo(result.Buffer.End);
Completion Calls
// Must call CompleteAsync for both Writer and Reader
await writer.CompleteAsync();
await reader.CompleteAsync();
10. References
Score
Total Score
65/100
Based on repository quality metrics
✓SKILL.md
SKILL.mdファイルが含まれている
+20
✓LICENSE
ライセンスが設定されている
+10
○説明文
100文字以上の説明がある
0/10
○人気
GitHub Stars 100以上
0/15
✓最近の活動
1ヶ月以内に更新
+10
○フォーク
10回以上フォークされている
0/5
✓Issue管理
オープンIssueが50未満
+5
✓言語
プログラミング言語が設定されている
+5
✓タグ
1つ以上のタグが設定されている
+5
Reviews
💬
Reviews coming soon
