
effect-patterns-streams-sinks
by PaulJPhilp
A community-driven knowledge base of practical patterns for Effect-TS.
SKILL.md
name: effect-patterns-streams-sinks description: Effect-TS patterns for Streams Sinks. Use when working with streams sinks in Effect-TS applications.
Effect-TS Patterns: Streams Sinks
This skill provides 6 curated Effect-TS patterns for streams sinks. Use this skill when working on tasks related to:
- streams sinks
- Best practices in Effect-TS applications
- Real-world patterns and solutions
🟡 Intermediate Patterns
Sink Pattern 1: Batch Insert Stream Records into Database
Rule: Batch stream records before database operations to improve throughput and reduce transaction overhead.
Good Example:
This example demonstrates streaming user records from a paginated API and batching them for efficient database insertion.
import { Effect, Stream, Sink, Chunk } from "effect";
interface User {
readonly id: number;
readonly name: string;
readonly email: string;
}
interface PaginatedResponse {
readonly users: User[];
readonly nextPage: number | null;
}
// Mock API that returns paginated users
const fetchUserPage = (
page: number
): Effect.Effect<PaginatedResponse> =>
Effect.succeed(
page < 10
? {
users: Array.from({ length: 50 }, (_, i) => ({
id: page * 50 + i,
name: `User ${page * 50 + i}`,
email: `user${page * 50 + i}@example.com`,
})),
nextPage: page + 1,
}
: { users: [], nextPage: null }
).pipe(Effect.delay("10 millis"));
// Mock database insert that takes a batch of users
const insertUserBatch = (
users: readonly User[]
): Effect.Effect<number> =>
Effect.sync(() => {
console.log(`Inserting batch of ${users.length} users`);
return users.length;
}).pipe(Effect.delay("50 millis"));
// Create a stream of users from paginated API
const userStream: Stream.Stream<User> = Stream.paginateEffect(
0,
(page) =>
fetchUserPage(page).pipe(
Effect.map((response) => [
Chunk.fromIterable(response.users),
response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
])
)
);
// Sink that batches users and inserts them
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
0,
(count, chunk: Chunk.Chunk<User>) =>
Effect.gen(function* () {
const users = Chunk.toArray(chunk);
const inserted = yield* insertUserBatch(users);
return count + inserted;
}),
(count) => Effect.succeed(count)
).pipe(
// Batch into groups of 100 users
Sink.withChunking((chunk) =>
chunk.pipe(
Chunk.chunksOf(100),
Stream.fromIterable,
Stream.runCollect
)
)
);
// Run the stream with batching sink
const program = Effect.gen(function* () {
const totalInserted = yield* userStream.pipe(
Stream.run(batchInsertSink)
);
console.log(`Total users inserted: ${totalInserted}`);
});
Effect.runPromise(program);
This pattern:
- Creates a stream of users from a paginated API
- Defines a batching sink that collects users into groups of 100
- Inserts each batch to the database in a single operation
- Tracks total count of inserted records
The batching happens automatically—the sink collects elements until the batch size is reached, then processes the complete batch.
Rationale:
When consuming a stream of records to persist in a database, collect them into batches using Sink before inserting. This reduces the number of database round-trips and transaction overhead, improving overall throughput significantly.
Inserting records one-by-one is inefficient:
- Each insert is a separate database call (network latency, connection overhead)
- Each insert may be a separate transaction (ACID overhead)
- Resource contention and connection pool exhaustion at scale
Batching solves this by:
- Grouping N records into a single bulk insert operation
- Amortizing database overhead across multiple records
- Maintaining throughput even under backpressure
- Enabling efficient transaction semantics for the entire batch
For example, inserting 10,000 records one-by-one might take 100 seconds. Batching in groups of 100 might take just 2-3 seconds.
Sink Pattern 2: Write Stream Events to Event Log
Rule: Append stream events to an event log with metadata to maintain a complete, ordered record of what happened.
Good Example:
This example demonstrates an event sourcing pattern where a user account stream of events is appended to an event log with metadata.
import { Effect, Stream, Sink, DateTime, Data } from "effect";
// Event types
type AccountEvent =
| AccountCreated
| MoneyDeposited
| MoneyWithdrawn
| AccountClosed;
class AccountCreated extends Data.TaggedError("AccountCreated")<{
readonly accountId: string;
readonly owner: string;
readonly initialBalance: number;
}> {}
class MoneyDeposited extends Data.TaggedError("MoneyDeposited")<{
readonly accountId: string;
readonly amount: number;
}> {}
class MoneyWithdrawn extends Data.TaggedError("MoneyWithdrawn")<{
readonly accountId: string;
readonly amount: number;
}> {}
class AccountClosed extends Data.TaggedError("AccountClosed")<{
readonly accountId: string;
}> {}
// Event envelope with metadata
interface StoredEvent {
readonly eventId: string; // Unique identifier per event
readonly eventType: string; // Type of event
readonly aggregateId: string; // What this event is about
readonly aggregateType: string; // What kind of thing (Account)
readonly data: any; // Event payload
readonly metadata: {
readonly timestamp: number;
readonly version: number; // Position in log
readonly causationId?: string; // What caused this
};
}
// Mock event log that appends events
const eventLog: StoredEvent[] = [];
let eventVersion = 0;
const appendToEventLog = (
event: AccountEvent,
aggregateId: string
): Effect.Effect<StoredEvent> =>
Effect.gen(function* () {
const now = yield* DateTime.now;
const storedEvent: StoredEvent = {
eventId: `evt-${eventVersion}-${Date.now()}`,
eventType: event._tag,
aggregateId,
aggregateType: "Account",
data: event,
metadata: {
timestamp: now.toEpochMillis(),
version: ++eventVersion,
},
};
// Append to log (simulated)
eventLog.push(storedEvent);
console.log(
`[v${storedEvent.metadata.version}] ${storedEvent.eventType}: ${aggregateId}`
);
return storedEvent;
});
// Simulate a stream of events from various account operations
const accountEvents: Stream.Stream<[string, AccountEvent]> = Stream.fromIterable([
[
"acc-1",
new AccountCreated({
accountId: "acc-1",
owner: "Alice",
initialBalance: 1000,
}),
],
["acc-1", new MoneyDeposited({ accountId: "acc-1", amount: 500 })],
["acc-1", new MoneyWithdrawn({ accountId: "acc-1", amount: 200 })],
[
"acc-2",
new AccountCreated({
accountId: "acc-2",
owner: "Bob",
initialBalance: 2000,
}),
],
["acc-2", new MoneyDeposited({ accountId: "acc-2", amount: 1000 })],
["acc-1", new AccountClosed({ accountId: "acc-1" })],
]);
// Sink that appends each event to the log
const eventLogSink: Sink.Sink<number, never, [string, AccountEvent]> = Sink.fold(
0,
(count, [aggregateId, event]) =>
appendToEventLog(event, aggregateId).pipe(
Effect.map(() => count + 1)
),
(count) => Effect.succeed(count)
);
// Run the stream and append all events
const program = Effect.gen(function* () {
const totalEvents = yield* accountEvents.pipe(Stream.run(eventLogSink));
console.log(`\nTotal events appended: ${totalEvents}`);
console.log(`\nEvent log contents:`);
eventLog.forEach((event) => {
console.log(` [v${event.metadata.version}] ${event.eventType}`);
});
});
Effect.runPromise(program);
This pattern:
- Defines event types using tagged errors (AccountCreated, MoneyDeposited, etc.)
- Creates event envelopes with metadata (timestamp, version, causation)
- Streams events from various sources
- Appends to log with proper versioning and ordering
- Maintains history for reconstruction and audit
Rationale:
When consuming a stream of events that represent changes in your system, append each event to an event log using Sink. Event logs provide immutable, ordered records that enable event sourcing, audit trails, and temporal queries.
Event logs are foundational to many patterns:
- Event Sourcing: Instead of storing current state, store the sequence of events that led to it
- Audit Trails: Complete, tamper-proof record of who did what and when
- Temporal Queries: Reconstruct state at any point in time
- Consistency: Single source of truth for what happened
- Replay: Rebuild state or test changes by replaying events
Unlike batch inserts which are transactional, event logs are append-only. Each event is immutable once written. This simplicity enables:
- Fast appends (no updates, just sequential writes)
- Natural ordering (events in write order)
- Easy distribution (replicate the log)
- Strong consistency (events are facts that don't change)
Sink Pattern 4: Send Stream Records to Message Queue
Rule: Stream records to message queues with proper batching and acknowledgment for reliable distributed data flow.
Good Example:
This example demonstrates streaming sensor readings and publishing them to a message queue with topic-based partitioning.
import { Effect, Stream, Sink, Chunk } from "effect";
interface SensorReading {
readonly sensorId: string;
readonly location: string;
readonly temperature: number;
readonly humidity: number;
readonly timestamp: number;
}
// Mock message queue publisher
interface QueuePublisher {
readonly publish: (
topic: string,
partition: string,
messages: readonly SensorReading[]
) => Effect.Effect<{ acknowledged: number; messageIds: string[] }>;
}
// Create a mock queue publisher
const createMockPublisher = (): QueuePublisher => {
const publishedMessages: Record<string, SensorReading[]> = {};
return {
publish: (topic, partition, messages) =>
Effect.gen(function* () {
const key = `${topic}/${partition}`;
publishedMessages[key] = [
...(publishedMessages[key] ?? []),
...messages,
];
const messageIds = Array.from({ length: messages.length }, (_, i) =>
`msg-${Date.now()}-${i}`
);
console.log(
`Published ${messages.length} messages to ${key} (batch)`
);
return { acknowledged: messages.length, messageIds };
}),
};
};
// Determine the partition key based on sensor location
const getPartitionKey = (reading: SensorReading): string =>
reading.location; // Route by location for data locality
// Simulate a stream of sensor readings
const sensorStream: Stream.Stream<SensorReading> = Stream.fromIterable([
{
sensorId: "temp-1",
location: "warehouse-a",
temperature: 22.5,
humidity: 45,
timestamp: Date.now(),
},
{
sensorId: "temp-2",
location: "warehouse-b",
temperature: 21.0,
humidity: 50,
timestamp: Date.now() + 100,
},
{
sensorId: "temp-3",
location: "warehouse-a",
temperature: 22.8,
humidity: 46,
timestamp: Date.now() + 200,
},
{
sensorId: "temp-4",
location: "warehouse-c",
temperature: 20.5,
humidity: 55,
timestamp: Date.now() + 300,
},
{
sensorId: "temp-5",
location: "warehouse-b",
temperature: 21.2,
humidity: 51,
timestamp: Date.now() + 400,
},
{
sensorId: "temp-6",
location: "warehouse-a",
temperature: 23.0,
humidity: 47,
timestamp: Date.now() + 500,
},
]);
// Create a sink that batches and publishes to message queue
const createQueuePublishSink = (
publisher: QueuePublisher,
topic: string,
batchSize: number = 100
): Sink.Sink<number, Error, SensorReading> =>
Sink.fold(
{ batches: new Map<string, SensorReading[]>(), totalPublished: 0 },
(state, reading) =>
Effect.gen(function* () {
const partition = getPartitionKey(reading);
const batch = state.batches.get(partition) ?? [];
const newBatch = [...batch, reading];
if (newBatch.length >= batchSize) {
// Batch is full, publish it
const result = yield* publisher.publish(topic, partition, newBatch);
const newState = new Map(state.batches);
newState.delete(partition);
return {
...state,
batches: newState,
totalPublished: state.totalPublished + result.acknowledged,
};
} else {
// Add to batch and continue
const newState = new Map(state.batches);
newState.set(partition, newBatch);
return { ...state, batches: newState };
}
}),
(state) =>
Effect.gen(function* () {
let finalCount = state.totalPublished;
// Publish any remaining partial batches
for (const [partition, batch] of state.batches) {
if (batch.length > 0) {
const result = yield* publisher.publish(topic, partition, batch);
finalCount += result.acknowledged;
}
}
return finalCount;
})
);
// Run the stream and publish to queue
const program = Effect.gen(function* () {
const publisher = createMockPublisher();
const topic = "sensor-readings";
const published = yield* sensorStream.pipe(
Stream.run(createQueuePublishSink(publisher, topic, 50)) // Batch size of 50
);
console.log(
`\nTotal messages published to queue: ${published}`
);
});
Effect.runPromise(program);
This pattern:
- Groups readings by partition (location) for data locality
- Batches records before publishing (50 at a time)
- Publishes batches to the queue with partition key
- Flushes partial batches when stream ends
- Tracks acknowledgments from the queue
Rationale:
When consuming a stream of events that need to be distributed to other systems, use Sink to publish them to a message queue. Message queues provide reliable, scalable distribution with guarantees like ordering and exactly-once semantics.
Message queues are the backbone of event-driven architectures:
- Decoupling: Producers don't wait for consumers
- Scalability: Multiple subscribers can consume independently
- Durability: Messages persist even if subscribers are down
- Ordering: Maintain event sequence (per partition/topic)
- Reliability: Acknowledgments and retries ensure no message loss
Unlike direct writes which block, queue publishing is asynchronous and enables:
- High-throughput publishing (batch multiple records per operation)
- Backpressure handling (queue manages flow)
- Multi-subscriber patterns (fan-out)
- Dead letter queues for error handling
Sink Pattern 5: Fall Back to Alternative Sink on Failure
Rule: Implement fallback sinks to handle failures gracefully and ensure data is persisted even when the primary destination is unavailable.
Good Example:
This example demonstrates a system that tries to write order records to a fast in-memory cache first, falls back to database if cache fails, and falls back to a dead letter file if database fails.
import { Effect, Stream, Sink, Chunk, Either, Data } from "effect";
interface Order {
readonly orderId: string;
readonly customerId: string;
readonly total: number;
readonly timestamp: number;
}
class CacheSinkError extends Data.TaggedError("CacheSinkError")<{
readonly reason: string;
}> {}
class DatabaseSinkError extends Data.TaggedError("DatabaseSinkError")<{
readonly reason: string;
}> {}
// Mock in-memory cache sink (fast but limited)
const createCacheSink = (): Sink.Sink<number, CacheSinkError, Order> => {
const cache: Order[] = [];
const MAX_CACHE_SIZE = 1000;
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
if (cache.length >= MAX_CACHE_SIZE) {
yield* Effect.fail(
new CacheSinkError({
reason: `Cache full (${cache.length}/${MAX_CACHE_SIZE})`,
})
);
}
cache.push(order);
console.log(`[CACHE] Cached order ${order.orderId}`);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(`[CACHE] Final: ${count} orders in cache`);
return count;
})
);
};
// Mock database sink (slower but reliable)
const createDatabaseSink = (): Sink.Sink<number, DatabaseSinkError, Order> => {
const orders: Order[] = [];
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
// Simulate occasional database failures
if (Math.random() < 0.1) {
yield* Effect.fail(
new DatabaseSinkError({
reason: "Connection timeout",
})
);
}
orders.push(order);
console.log(`[DATABASE] Persisted order ${order.orderId}`);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(`[DATABASE] Final: ${count} orders in database`);
return count;
})
);
};
// Mock file sink (always works but slow)
const createDeadLetterSink = (): Sink.Sink<number, never, Order> => {
const deadLetters: Order[] = [];
return Sink.fold(
0,
(count, order) =>
Effect.gen(function* () {
deadLetters.push(order);
console.log(
`[DEAD-LETTER] Wrote order ${order.orderId} to dead letter file`
);
return count + 1;
}),
(count) =>
Effect.gen(function* () {
console.log(
`[DEAD-LETTER] Final: ${count} orders in dead letter file`
);
return count;
})
);
};
// Create a fallback sink that tries cache -> database -> file
const createFallbackSink = (): Sink.Sink<
{ readonly cached: number; readonly persisted: number; readonly deadLetters: number },
never,
Order
> =>
Sink.fold(
{ cached: 0, persisted: 0, deadLetters: 0 },
(state, order) =>
Effect.gen(function* () {
// Try cache first
const cacheResult = yield* createCacheSink()
.pipe(Sink.feed(Chunk.of(order)))
.pipe(Effect.either);
if (Either.isRight(cacheResult)) {
return {
...state,
cached: state.cached + cacheResult.right,
};
}
console.log(
`[FALLBACK] Cache failed (${cacheResult.left.reason}), trying database`
);
// Cache failed, try database
const dbResult = yield* createDatabaseSink()
.pipe(Sink.feed(Chunk.of(order)))
.pipe(Effect.either);
if (Either.isRight(dbResult)) {
return {
...state,
persisted: state.persisted + dbResult.right,
};
}
console.log(
`[FALLBACK] Database failed (${dbResult.left.reason}), falling back to dead letter`
);
// Database failed, use dead letter
const dlResult = yield* createDeadLetterSink()
.pipe(Sink.feed(Chunk.of(order)));
return {
...state,
deadLetters: state.deadLetters + dlResult,
};
}),
(state) =>
Effect.gen(function* () {
console.log(`\n[SUMMARY]`);
console.log(` Cached: ${state.cached}`);
console.log(` Persisted: ${state.persisted}`);
console.log(` Dead Letter: ${state.deadLetters}`);
return state;
})
);
// Simulate a stream of orders
const orderStream: Stream.Stream<Order> = Stream.fromIterable([
{
orderId: "order-1",
customerId: "cust-1",
total: 99.99,
timestamp: Date.now(),
},
{
orderId: "order-2",
customerId: "cust-2",
total: 149.99,
timestamp: Date.now() + 100,
},
{
orderId: "order-3",
customerId: "cust-1",
total: 49.99,
timestamp: Date.now() + 200,
},
{
orderId: "order-4",
customerId: "cust-3",
total: 199.99,
timestamp: Date.now() + 300,
},
{
orderId: "order-5",
customerId: "cust-2",
total: 89.99,
timestamp: Date.now() + 400,
},
]);
// Run the stream with fallback sink
const program = Effect.gen(function* () {
const result = yield* orderStream.pipe(Stream.run(createFallbackSink()));
console.log(`\nTotal orders processed: ${result.cached + result.persisted + result.deadLetters}`);
});
Effect.runPromise(program);
This pattern:
- Tries cache first (fast, limited capacity)
- Falls back to database if cache is full
- Falls back to dead letter if database fails
- Tracks which sink was used for each record
- Reports summary of where data went
Rationale:
When consuming a stream to a primary destination that might fail, wrap it in a fallback pattern. If the primary sink fails, automatically redirect the stream to an alternative sink. This enables progressive degradation where the system degrades gracefully rather than failing completely.
Production systems need resilience:
- Primary failures: Database down, network timeout, quota exceeded
- Progressive degradation: Keep the system running, even at reduced capacity
- No data loss: Fallback ensures data is persisted somewhere
- Operational flexibility: Choose fallback based on failure type
- Monitoring: Track when fallbacks are used to alert operators
Without fallback patterns:
- System fails when primary destination fails
- Data is lost if primary is unavailable
- No clear signal that degradation occurred
With fallback sinks:
- Stream continues even when primary fails
- Data is safely persisted to alternative
- Clear audit trail of which sink was used
Sink Pattern 6: Retry Failed Stream Operations
Rule: Implement retry strategies in sinks to handle transient failures and improve resilience without manual intervention.
Good Example:
This example demonstrates retrying database writes with exponential backoff, tracking attempts, and falling back on permanent failures.
import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect";
interface UserRecord {
readonly userId: string;
readonly name: string;
readonly email: string;
}
class WriteError extends Error {
readonly isTransient: boolean;
constructor(message: string, isTransient: boolean = true) {
super(message);
this.name = "WriteError";
this.isTransient = isTransient;
}
}
// Mock database that occasionally fails
const database = {
failureRate: 0.3, // 30% transient failure rate
permanentFailureRate: 0.05, // 5% permanent failure rate
insertUser: (user: UserRecord): Effect.Effect<void, WriteError> =>
Effect.gen(function* () {
const rand = Math.random();
// Permanent failure (e.g., constraint violation)
if (rand < database.permanentFailureRate) {
throw new WriteError(
`Permanent: User ${user.userId} already exists`,
false
);
}
// Transient failure (e.g., connection timeout)
if (rand < database.permanentFailureRate + database.failureRate) {
throw new WriteError(
`Transient: Connection timeout writing ${user.userId}`,
true
);
}
// Success
console.log(`✓ Wrote user ${user.userId}`);
}),
};
// Retry configuration
interface RetryConfig {
readonly maxAttempts: number;
readonly initialDelayMs: number;
readonly maxDelayMs: number;
readonly backoffFactor: number;
}
const defaultRetryConfig: RetryConfig = {
maxAttempts: 5,
initialDelayMs: 100, // Start with 100ms
maxDelayMs: 5000, // Cap at 5 seconds
backoffFactor: 2, // Double each time
};
// Result tracking
interface OperationResult {
readonly succeeded: number;
readonly transientFailures: number;
readonly permanentFailures: number;
readonly detailedStats: Array<{
readonly userId: string;
readonly attempts: number;
readonly status: "success" | "transient-failed" | "permanent-failed";
}>;
}
// Create a sink with retry logic
const createRetrySink = (config: RetryConfig): Sink.Sink<OperationResult, never, UserRecord> =>
Sink.fold(
{
succeeded: 0,
transientFailures: 0,
permanentFailures: 0,
detailedStats: [],
},
(state, user) =>
Effect.gen(function* () {
let lastError: WriteError | null = null;
let attempts = 0;
// Retry loop
for (attempts = 1; attempts <= config.maxAttempts; attempts++) {
try {
yield* database.insertUser(user);
// Success!
console.log(
`[${user.userId}] Success on attempt ${attempts}/${config.maxAttempts}`
);
return {
...state,
succeeded: state.succeeded + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "success",
},
],
};
} catch (error) {
lastError = error as WriteError;
if (!lastError.isTransient) {
// Permanent failure, don't retry
console.log(
`[${user.userId}] Permanent failure: ${lastError.message}`
);
return {
...state,
permanentFailures: state.permanentFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "permanent-failed",
},
],
};
}
// Transient failure, retry if attempts remain
if (attempts < config.maxAttempts) {
// Calculate delay with exponential backoff
let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1);
delayMs = Math.min(delayMs, config.maxDelayMs);
// Add jitter (±10%)
const jitter = delayMs * 0.1;
delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter;
console.log(
`[${user.userId}] Transient failure (attempt ${attempts}/${config.maxAttempts}): ${lastError.message}`
);
console.log(` Retrying in ${Math.round(delayMs)}ms...`);
yield* Effect.sleep(Duration.millis(Math.round(delayMs)));
}
}
}
// All retries exhausted
console.log(
`[${user.userId}] Failed after ${config.maxAttempts} attempts`
);
return {
...state,
transientFailures: state.transientFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts: config.maxAttempts,
status: "transient-failed",
},
],
};
}),
(state) =>
Effect.gen(function* () {
console.log(`\n[SUMMARY]`);
console.log(` Succeeded: ${state.succeeded}`);
console.log(` Transient Failures: ${state.transientFailures}`);
console.log(` Permanent Failures: ${state.permanentFailures}`);
console.log(` Total: ${state.detailedStats.length}`);
// Show detailed stats
const failed = state.detailedStats.filter((s) => s.status !== "success");
if (failed.length > 0) {
console.log(`\n[FAILURES]`);
failed.forEach((stat) => {
console.log(
` ${stat.userId}: ${stat.attempts} attempts (${stat.status})`
);
});
}
return state;
})
);
// Simulate a stream of users to insert
const userStream: Stream.Stream<UserRecord> = Stream.fromIterable([
{ userId: "user-1", name: "Alice", email: "alice@example.com" },
{ userId: "user-2", name: "Bob", email: "bob@example.com" },
{ userId: "user-3", name: "Charlie", email: "charlie@example.com" },
{ userId: "user-4", name: "Diana", email: "diana@example.com" },
{ userId: "user-5", name: "Eve", email: "eve@example.com" },
]);
// Run the stream with retry sink
const program = Effect.gen(function* () {
const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig)));
console.log(`\nProcessing complete.`);
});
Effect.runPromise(program);
This pattern:
- Attempts operation up to max retries
- Distinguishes transient vs. permanent failures
- Uses exponential backoff to space retries
- Adds jitter to prevent thundering herd
- Tracks detailed stats for monitoring
- Reports summary of outcomes
Rationale:
When consuming a stream to a destination that may experience transient failures (network timeouts, rate limiting, temporary unavailability), wrap the sink operation with a retry policy. Use exponential backoff to avoid overwhelming a recovering system while still recovering quickly.
Transient failures are common in distributed systems:
- Network timeouts: Temporary connectivity issues resolve themselves
- Rate limiting: Service recovers once rate limit window resets
- Temporary unavailability: Services restart or scale up
- Circuit breaker trips: Service recovers after backoff period
Without retry logic:
- Every transient failure causes data loss or stream interruption
- Manual intervention required to restart
- System appears less reliable than it actually is
With intelligent retry logic:
- Automatic recovery from transient failures
- Exponential backoff prevents thundering herd
- Clear visibility into which operations failed permanently
- Data flows continuously despite temporary issues
Sink Pattern 3: Write Stream Lines to File
Rule: Write streaming lines to a file efficiently using buffered output and proper resource management.
Good Example:
This example demonstrates streaming log entries and writing them to a file with buffering.
import { Effect, Stream, Sink, Chunk, FileSystem } from "effect";
interface LogEntry {
readonly level: "debug" | "info" | "warn" | "error";
readonly message: string;
readonly timestamp: number;
}
// Format a log entry as a line
const formatLogLine = (entry: LogEntry): string => {
const iso = new Date(entry.timestamp).toISOString();
return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`;
};
// Simulate a stream of log entries
const logStream: Stream.Stream<LogEntry> = Stream.fromIterable([
{ level: "info", message: "Server starting", timestamp: Date.now() },
{ level: "debug", message: "Loading config", timestamp: Date.now() + 100 },
{ level: "info", message: "Connected to database", timestamp: Date.now() + 200 },
{ level: "warn", message: "High memory usage detected", timestamp: Date.now() + 300 },
{ level: "info", message: "Processing request", timestamp: Date.now() + 400 },
{ level: "error", message: "Connection timeout", timestamp: Date.now() + 500 },
{ level: "info", message: "Retrying connection", timestamp: Date.now() + 600 },
{ level: "info", message: "Connection restored", timestamp: Date.now() + 700 },
]);
// Create a file writer sink with buffering
const createFileWriteSink = (
filePath: string,
bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
Effect.scoped(
Effect.gen(function* () {
// Open file in append mode
const fs = yield* FileSystem.FileSystem;
const handle = yield* fs.open(filePath, "a");
let buffer: string[] = [];
let lineCount = 0;
// Flush buffered lines to disk
const flush = Effect.gen(function* () {
if (buffer.length === 0) return;
const content = buffer.join("\n") + "\n";
yield* fs.write(handle, content);
buffer = [];
});
// Return the sink
return Sink.fold(
0,
(count, line: string) =>
Effect.gen(function* () {
buffer.push(line);
const newCount = count + 1;
// Flush when buffer reaches size limit
if (buffer.length >= bufferSize) {
yield* flush;
}
return newCount;
}),
(count) =>
Effect.gen(function* () {
// Flush any remaining lines before closing
yield* flush;
yield* fs.close(handle);
return count;
})
);
})
).pipe(
Effect.flatten
);
// Process the log stream
const program = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const filePath = "/tmp/app.log";
// Clear the file first
yield* fs.writeFileString(filePath, "");
// Stream logs, format them, and write to file
const written = yield* logStream.pipe(
Stream.map(formatLogLine),
Stream.run(createFileWriteSink(filePath, 50)) // Buffer 50 lines before flush
);
console.log(`Wrote ${written} log lines to ${filePath}`);
// Read back the file to verify
const content = yield* fs.readFileString(filePath);
console.log("\nFile contents:");
console.log(content);
});
Effect.runPromise(program);
This pattern:
- Opens a file for appending
- Buffers log lines in memory (50 lines before flush)
- Flushes periodically when buffer fills or stream ends
- Closes the file safely using scopes
- Tracks line count for confirmation
Rationale:
When consuming a stream of data to persist as lines in a file, use Sink with a file writer. Buffer the output for efficiency and ensure proper resource cleanup using Effect's scope management.
Writing stream data to files requires:
- Buffering: Writing one line at a time is slow. Buffer multiple lines before flushing to disk
- Efficiency: Reduce system calls and I/O overhead by batching writes
- Resource Management: Ensure file handles are properly closed even on errors
- Ordering: Maintain the order of lines as they appear in the stream
This pattern is essential for:
- Log files and audit trails
- CSV/JSON Line export
- Streaming data archival
- Data pipelines with file intermediates
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 500以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon


