Back to list
PaulJPhilp

effect-patterns-concurrency

by PaulJPhilp

A community-driven knowledge base of practical patterns for Effect-TS.

614🍴 20📅 Jan 23, 2026

SKILL.md


name: effect-patterns-concurrency description: Effect-TS patterns for Concurrency. Use when working with concurrency in Effect-TS applications.

Effect-TS Patterns: Concurrency

This skill provides 20 curated Effect-TS patterns for concurrency. Use this skill when working on tasks related to:

  • concurrency
  • Best practices in Effect-TS applications
  • Real-world patterns and solutions

🟡 Intermediate Patterns

Race Concurrent Effects for the Fastest Result

Rule: Use Effect.race to get the result from the first of several effects to succeed, automatically interrupting the losers.

Good Example:

A classic use case is checking a fast cache before falling back to a slower database. We can race the cache lookup against the database query.

import { Effect, Option } from "effect";

type User = { id: number; name: string };

// Simulate a slower cache lookup that might find nothing (None)
const checkCache: Effect.Effect<Option.Option<User>> = Effect.succeed(
  Option.none()
).pipe(
  Effect.delay("200 millis") // Made slower so database wins
);

// Simulate a faster database query that will always find the data
const queryDatabase: Effect.Effect<Option.Option<User>> = Effect.succeed(
  Option.some({ id: 1, name: "Paul" })
).pipe(
  Effect.delay("50 millis") // Made faster so it wins the race
);

// Race them. The database should win and return the user data.
const program = Effect.race(checkCache, queryDatabase).pipe(
  // The result of the race is an Option, so we can handle it.
  Effect.flatMap((result: Option.Option<User>) =>
    Option.match(result, {
      onNone: () => Effect.fail("User not found anywhere."),
      onSome: (user) => Effect.succeed(user),
    })
  )
);

// In this case, the database wins the race.
const programWithResults = Effect.gen(function* () {
  try {
    const user = yield* program;
    yield* Effect.log(`User found: ${JSON.stringify(user)}`);
    return user;
  } catch (error) {
    yield* Effect.logError(`Error: ${error}`);
    throw error;
  }
}).pipe(
  Effect.catchAll((error) =>
    Effect.gen(function* () {
      yield* Effect.logError(`Handled error: ${error}`);
      return null;
    })
  )
);

Effect.runPromise(programWithResults);

// Also demonstrate with logging
const programWithLogging = Effect.gen(function* () {
  yield* Effect.logInfo("Starting race between cache and database...");

  try {
    const user = yield* program;
    yield* Effect.logInfo(
      `Success: Found user ${user.name} with ID ${user.id}`
    );
    return user;
  } catch (error) {
    yield* Effect.logInfo("This won't be reached due to Effect error handling");
    return null;
  }
}).pipe(
  Effect.catchAll((error) =>
    Effect.gen(function* () {
      yield* Effect.logInfo(`Handled error: ${error}`);
      return null;
    })
  )
);

Effect.runPromise(programWithLogging);

Anti-Pattern:

Don't use Effect.race if you need the results of all the effects. That is the job of Effect.all. Using race in this scenario will cause you to lose data, as all but one of the effects will be interrupted and their results discarded.

import { Effect } from "effect";

const fetchProfile = Effect.succeed({ name: "Paul" });
const fetchPermissions = Effect.succeed(["admin", "editor"]);

// ❌ WRONG: This will only return either the profile OR the permissions,
// whichever resolves first. You will lose the other piece of data.
const incompleteData = Effect.race(fetchProfile, fetchPermissions);

// ✅ CORRECT: Use Effect.all when you need all the results.
const completeData = Effect.all([fetchProfile, fetchPermissions]);

Rationale:

When you have multiple effects that can produce the same type of result, and you only care about the one that finishes first, use Effect.race(effectA, effectB).


Effect.race is a powerful concurrency primitive for performance and resilience. It starts all provided effects in parallel. The moment one of them succeeds, Effect.race immediately interrupts all the other "losing" effects and returns the winning result. If one of the effects fails before any have succeeded, the race is not over; the remaining effects continue to run. The entire race only fails if all participating effects fail.

This is commonly used for:

  • Performance: Querying multiple redundant data sources (e.g., two API replicas) and taking the response from whichever is faster.
  • Implementing Timeouts: Racing a primary effect against a delayed Effect.fail, effectively creating a timeout mechanism.


Concurrency Pattern 2: Rate Limit Concurrent Access with Semaphore

Rule: Use Semaphore to limit concurrent access to resources, preventing overload and enabling fair resource distribution.

Good Example:

This example demonstrates limiting concurrent database connections using a Semaphore, preventing connection pool exhaustion.

import { Effect, Semaphore, Fiber } from "effect";

interface QueryResult {
  readonly id: number;
  readonly result: string;
  readonly duration: number;
}

// Simulate a database query that holds a connection
const executeQuery = (
  queryId: number,
  connectionId: number,
  durationMs: number
): Effect.Effect<QueryResult> =>
  Effect.gen(function* () {
    const startTime = Date.now();

    yield* Effect.log(
      `[Query ${queryId}] Using connection ${connectionId}, duration: ${durationMs}ms`
    );

    // Simulate query execution
    yield* Effect.sleep(`${durationMs} millis`);

    const duration = Date.now() - startTime;

    return {
      id: queryId,
      result: `Result from query ${queryId}`,
      duration,
    };
  });

// Pool configuration
interface ConnectionPoolConfig {
  readonly maxConnections: number;
  readonly queryTimeout?: number;
}

// Create a rate-limited query executor
const createRateLimitedQueryExecutor = (
  config: ConnectionPoolConfig
): Effect.Effect<
  (queryId: number, durationMs: number) => Effect.Effect<QueryResult>
> =>
  Effect.gen(function* () {
    const semaphore = yield* Semaphore.make(config.maxConnections);
    let connectionCounter = 0;

    return (queryId: number, durationMs: number) =>
      Effect.gen(function* () {
        // Acquire a permit (wait if none available)
        yield* Semaphore.acquire(semaphore);

        const connectionId = ++connectionCounter;

        // Use try-finally to ensure permit is released
        const result = yield* executeQuery(queryId, connectionId, durationMs).pipe(
          Effect.ensuring(
            Semaphore.release(semaphore).pipe(
              Effect.tap(() =>
                Effect.log(`[Query ${queryId}] Released connection ${connectionId}`)
              )
            )
          )
        );

        return result;
      });
  });

// Simulate multiple queries arriving
const program = Effect.gen(function* () {
  const executor = yield* createRateLimitedQueryExecutor({
    maxConnections: 3, // Only 3 concurrent connections
  });

  // Generate 10 queries with varying durations
  const queries = Array.from({ length: 10 }, (_, i) => ({
    id: i + 1,
    duration: 500 + Math.random() * 1500, // 500-2000ms
  }));

  console.log(`\n[POOL] Starting with max 3 concurrent connections\n`);

  // Execute all queries with concurrency limit
  const results = yield* Effect.all(
    queries.map((q) =>
      executor(q.id, Math.round(q.duration)).pipe(Effect.fork)
    )
  ).pipe(
    Effect.andThen((fibers) =>
      Effect.all(fibers.map((fiber) => Fiber.join(fiber)))
    )
  );

  console.log(`\n[POOL] All queries completed\n`);

  // Summary
  const totalDuration = results.reduce((sum, r) => sum + r.duration, 0);
  const avgDuration = totalDuration / results.length;

  console.log(`[SUMMARY]`);
  console.log(`  Total queries: ${results.length}`);
  console.log(`  Avg duration: ${Math.round(avgDuration)}ms`);
  console.log(`  Total time: ${Math.max(...results.map((r) => r.duration))}ms (parallel)`);
});

Effect.runPromise(program);

This pattern:

  1. Creates a Semaphore with fixed permit count
  2. Acquires permit before using connection
  3. Executes operation while holding permit
  4. Releases permit in finally block (guaranteed)
  5. Fair queuing of waiting queries

Rationale:

When you need to limit how many operations can run concurrently (e.g., max 10 database connections, max 5 API calls per second), use Semaphore. A Semaphore tracks a pool of permits; operations acquire a permit before proceeding and release it when done. Waiting operations are queued fairly.


Resource constraints require limiting concurrency:

  • Connection pools: Database limited to N connections
  • API rate limits: Service allows only M requests per second
  • Memory limits: Large operations can't all run simultaneously
  • CPU constraints: Too many threads waste cycles on context switching
  • Backpressure: Prevent downstream from being overwhelmed

Without Semaphore:

  • All operations run simultaneously, exhausting resources
  • Connection pool overflows, requests fail
  • Memory pressure causes garbage collection pauses
  • No fair ordering (first-come-first-served)

With Semaphore:

  • Fixed concurrency limit
  • Fair queuing of waiting operations
  • Backpressure naturally flows upstream
  • Clear ownership of permits


Manage Shared State Safely with Ref

Rule: Use Ref to manage shared, mutable state concurrently, ensuring atomicity.

Good Example:

This program simulates 1,000 concurrent fibers all trying to increment a shared counter. Because we use Ref.update, every single increment is applied atomically, and the final result is always correct.

import { Effect, Ref } from "effect";

const program = Effect.gen(function* () {
  // Create a new Ref with an initial value of 0
  const ref = yield* Ref.make(0);

  // Define an effect that increments the counter by 1
  const increment = Ref.update(ref, (n) => n + 1);

  // Create an array of 1,000 increment effects
  const tasks = Array.from({ length: 1000 }, () => increment);

  // Run all 1,000 effects concurrently
  yield* Effect.all(tasks, { concurrency: "unbounded" });

  // Get the final value of the counter
  return yield* Ref.get(ref);
});

// The result will always be 1000
const programWithLogging = Effect.gen(function* () {
  const result = yield* program;
  yield* Effect.log(`Final counter value: ${result}`);
  return result;
});

Effect.runPromise(programWithLogging);

Anti-Pattern:

The anti-pattern is using a standard JavaScript variable for shared state. The following example is not guaranteed to produce the correct result.

import { Effect } from "effect";

// ❌ WRONG: This is a classic race condition.
const programWithRaceCondition = Effect.gen(function* () {
  let count = 0; // A plain, mutable variable

  // An effect that reads, increments, and writes the variable
  const increment = Effect.sync(() => {
    const current = count;
    // Another fiber could run between this read and the write below!
    count = current + 1;
  });

  const tasks = Array.from({ length: 1000 }, () => increment);

  yield* Effect.all(tasks, { concurrency: "unbounded" });

  return count;
});

// The result is unpredictable and will likely be less than 1000.
Effect.runPromise(programWithRaceCondition).then(console.log);

Rationale:

When you need to share mutable state between different concurrent fibers, create a Ref<A>. Use Ref.get to read the value and Ref.update or Ref.set to modify it. All operations on a Ref are atomic.


Directly using a mutable variable (e.g., let myState = ...) in a concurrent system is dangerous. Multiple fibers could try to read and write to it at the same time, leading to race conditions and unpredictable results.

Ref solves this by wrapping the state in a fiber-safe container. It's like a synchronized, in-memory cell. All operations on a Ref are atomic effects, guaranteeing that updates are applied correctly without being interrupted or interleaved with other updates. This eliminates race conditions and ensures data integrity.



Run Independent Effects in Parallel with Effect.all

Rule: Use Effect.all to execute a collection of independent effects concurrently.

Good Example:

Imagine fetching a user's profile and their latest posts from two different API endpoints. These are independent operations and can be run in parallel to save time.

import { Effect } from "effect";

// Simulate fetching a user, takes 1 second
const fetchUser = Effect.succeed({ id: 1, name: "Paul" }).pipe(
  Effect.delay("1 second")
);

// Simulate fetching posts, takes 1.5 seconds
const fetchPosts = Effect.succeed([{ title: "Effect is great" }]).pipe(
  Effect.delay("1.5 seconds")
);

// Run both effects concurrently - must specify concurrency option!
const program = Effect.all([fetchUser, fetchPosts], {
  concurrency: "unbounded",
});

// The resulting effect will succeed with a tuple: [{id, name}, [{title}]]
// Total execution time will be ~1.5 seconds (the duration of the longest task).
const programWithLogging = Effect.gen(function* () {
  const results = yield* program;
  yield* Effect.log(`Results: ${JSON.stringify(results)}`);
  return results;
});

Effect.runPromise(programWithLogging);

Anti-Pattern:

The anti-pattern is running independent tasks sequentially using Effect.gen. This is inefficient and unnecessarily slows down your application.

import { Effect } from "effect";
import { fetchUser, fetchPosts } from "./somewhere"; // From previous example

// ❌ WRONG: This is inefficient.
const program = Effect.gen(function* () {
  // fetchUser runs and completes...
  const user = yield* fetchUser;
  // ...only then does fetchPosts begin.
  const posts = yield* fetchPosts;
  return [user, posts];
});

// Total execution time will be ~2.5 seconds (1s + 1.5s),
// which is a full second slower than the parallel version.
Effect.runPromise(program).then(console.log);

Rationale:

When you have multiple Effects that do not depend on each other's results, run them concurrently using Effect.all. This will execute all effects at the same time and return a new Effect that succeeds with a tuple containing all the results.


Running tasks sequentially when they could be done in parallel is a common source of performance bottlenecks. Effect.all is the solution. It's the direct equivalent of Promise.all in the Effect ecosystem.

Instead of waiting for Task A to finish before starting Task B, Effect.all starts all tasks simultaneously. The total time to complete is determined by the duration of the longest running effect, not the sum of all durations. If any single effect in the collection fails, the entire Effect.all will fail immediately.



Concurrency Pattern 3: Coordinate Multiple Fibers with Latch

Rule: Use Latch to coordinate multiple fibers awaiting a common completion signal, enabling fan-out/fan-in and barrier synchronization patterns.

Good Example:

This example demonstrates a fan-out/fan-in pattern: spawn 5 worker fibers that process tasks in parallel, and coordinate to know when all are complete.

import { Effect, Latch, Fiber, Ref } from "effect";

interface WorkResult {
  readonly workerId: number;
  readonly taskId: number;
  readonly result: string;
  readonly duration: number;
}

// Simulate a long-running task
const processTask = (
  workerId: number,
  taskId: number
): Effect.Effect<WorkResult> =>
  Effect.gen(function* () {
    const startTime = Date.now();
    const duration = 100 + Math.random() * 400; // 100-500ms

    yield* Effect.log(
      `[Worker ${workerId}] Starting task ${taskId} (duration: ${Math.round(duration)}ms)`
    );

    yield* Effect.sleep(`${Math.round(duration)} millis`);

    const elapsed = Date.now() - startTime;

    yield* Effect.log(
      `[Worker ${workerId}] ✓ Completed task ${taskId} in ${elapsed}ms`
    );

    return {
      workerId,
      taskId,
      result: `Result from worker ${workerId} on task ${taskId}`,
      duration: elapsed,
    };
  });

// Fan-out/Fan-in with Latch
const fanOutFanIn = Effect.gen(function* () {
  const numWorkers = 5;
  const tasksPerWorker = 3;

  // Create latch: will countdown from (numWorkers) when all workers complete
  const workersCompleteLatch = yield* Latch.make(numWorkers);

  // Track results from all workers
  const results = yield* Ref.make<WorkResult[]>([]);

  // Worker fiber that processes tasks sequentially
  const createWorker = (workerId: number) =>
    Effect.gen(function* () {
      try {
        yield* Effect.log(`[Worker ${workerId}] ▶ Starting`);

        // Process multiple tasks
        for (let i = 1; i <= tasksPerWorker; i++) {
          const result = yield* processTask(workerId, i);
          yield* Ref.update(results, (rs) => [...rs, result]);
        }

        yield* Effect.log(`[Worker ${workerId}] ✓ All tasks completed`);
      } finally {
        // Signal completion to latch
        yield* Latch.countDown(workersCompleteLatch);
        yield* Effect.log(`[Worker ${workerId}] Signaled latch`);
      }
    });

  // Spawn all workers as background fibers
  console.log(`\n[COORDINATOR] Spawning ${numWorkers} workers...\n`);

  const workerFibers = yield* Effect.all(
    Array.from({ length: numWorkers }, (_, i) =>
      createWorker(i + 1).pipe(Effect.fork)
    )
  );

  // Wait for all workers to complete
  console.log(`\n[COORDINATOR] Waiting for all workers to finish...\n`);

  yield* Latch.await(workersCompleteLatch);

  console.log(`\n[COORDINATOR] All workers completed!\n`);

  // Join all fibers to ensure cleanup
  yield* Effect.all(workerFibers.map((fiber) => Fiber.join(fiber)));

  // Aggregate results
  const allResults = yield* Ref.get(results);

  console.log(`[SUMMARY]`);
  console.log(`  Total workers: ${numWorkers}`);
  console.log(`  Tasks per worker: ${tasksPerWorker}`);
  console.log(`  Total tasks: ${allResults.length}`);
  console.log(
    `  Avg task duration: ${Math.round(
      allResults.reduce((sum, r) => sum + r.duration, 0) / allResults.length
    )}ms`
  );
});

Effect.runPromise(fanOutFanIn);

This pattern:

  1. Creates Latch with count = number of workers
  2. Spawns worker fibers as background tasks
  3. Each worker processes tasks independently
  4. Signals Latch when work completes (countDown)
  5. Coordinator awaits until all workers signal
  6. Aggregates results from all workers

Rationale:

When you need multiple fibers to coordinate and wait for a shared completion condition, use Latch. A Latch is a countdown synchronization object: you initialize it with N, each fiber calls countDown(), and all waiting fibers are released when the count reaches zero. This enables fan-out/fan-in patterns and barrier synchronization.


Multi-fiber coordination requires synchronization:

  • Parallel initialization: Wait for all services to start before proceeding
  • Fan-out/fan-in: Spawn multiple workers, collect results when all done
  • Barrier synchronization: All fibers wait at a checkpoint before proceeding
  • Graceful shutdown: Wait for all active fibers to complete
  • Aggregation patterns: Process streams in parallel, combine when ready

Unlike Deferred (one producer signals once), Latch:

  • Supports multiple signalers (each countDown())
  • Used with known count of participants (countdown from N to 0)
  • Enables barrier patterns (all wait for all)
  • Fair queuing of waiting fibers


Concurrency Pattern 5: Broadcast Events with PubSub

Rule: Use PubSub to broadcast events to multiple subscribers, enabling event-driven architectures where publishers and subscribers are loosely coupled.

Good Example:

This example demonstrates a multi-subscriber event broadcast system with independent handlers.

import { Effect, PubSub, Fiber, Ref } from "effect";

interface StateChangeEvent {
  readonly id: string;
  readonly oldValue: string;
  readonly newValue: string;
  readonly timestamp: number;
}

interface Subscriber {
  readonly name: string;
  readonly events: StateChangeEvent[];
}

// Create subscribers that react to events
const createSubscriber = (
  name: string,
  pubsub: PubSub.PubSub<StateChangeEvent>,
  events: Ref.Ref<StateChangeEvent[]>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[${name}] ✓ Subscribed`);

    // Get subscriber handle
    const subscription = yield* PubSub.subscribe(pubsub);

    // Listen for events indefinitely
    while (true) {
      const event = yield* subscription.take();

      yield* Effect.log(
        `[${name}] Received event: ${event.oldValue} → ${event.newValue}`
      );

      // Simulate processing
      yield* Effect.sleep("50 millis");

      // Store event (example action)
      yield* Ref.update(events, (es) => [...es, event]);

      yield* Effect.log(`[${name}] ✓ Processed event`);
    }
  });

// Publisher that broadcasts events
const publisher = (
  pubsub: PubSub.PubSub<StateChangeEvent>,
  eventCount: number
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[PUBLISHER] Starting, publishing ${eventCount} events`);

    for (let i = 1; i <= eventCount; i++) {
      const event: StateChangeEvent = {
        id: `event-${i}`,
        oldValue: `state-${i - 1}`,
        newValue: `state-${i}`,
        timestamp: Date.now(),
      };

      // Publish to all subscribers
      const size = yield* PubSub.publish(pubsub, event);

      yield* Effect.log(
        `[PUBLISHER] Published event to ${size} subscribers`
      );

      // Simulate delay between events
      yield* Effect.sleep("200 millis");
    }

    yield* Effect.log(`[PUBLISHER] ✓ All events published`);
  });

// Main: coordinate publisher and multiple subscribers
const program = Effect.gen(function* () {
  // Create PubSub with bounded capacity
  const pubsub = yield* PubSub.bounded<StateChangeEvent>(5);

  // Create storage for each subscriber's events
  const subscriber1Events = yield* Ref.make<StateChangeEvent[]>([]);
  const subscriber2Events = yield* Ref.make<StateChangeEvent[]>([]);
  const subscriber3Events = yield* Ref.make<StateChangeEvent[]>([]);

  console.log(`\n[MAIN] Starting PubSub event broadcast system\n`);

  // Subscribe 3 independent subscribers
  const sub1Fiber = yield* createSubscriber(
    "SUBSCRIBER-1",
    pubsub,
    subscriber1Events
  ).pipe(Effect.fork);

  const sub2Fiber = yield* createSubscriber(
    "SUBSCRIBER-2",
    pubsub,
    subscriber2Events
  ).pipe(Effect.fork);

  const sub3Fiber = yield* createSubscriber(
    "SUBSCRIBER-3",
    pubsub,
    subscriber3Events
  ).pipe(Effect.fork);

  // Wait for subscriptions to establish
  yield* Effect.sleep("100 millis");

  // Start publisher
  const publisherFiber = yield* publisher(pubsub, 5).pipe(Effect.fork);

  // Wait for publisher to finish
  yield* Fiber.join(publisherFiber);

  // Wait a bit for subscribers to process last events
  yield* Effect.sleep("1 second");

  // Shut down
  yield* PubSub.shutdown(pubsub);
  yield* Fiber.join(sub1Fiber).pipe(Effect.catchAll(() => Effect.void));
  yield* Fiber.join(sub2Fiber).pipe(Effect.catchAll(() => Effect.void));
  yield* Fiber.join(sub3Fiber).pipe(Effect.catchAll(() => Effect.void));

  // Print summary
  const events1 = yield* Ref.get(subscriber1Events);
  const events2 = yield* Ref.get(subscriber2Events);
  const events3 = yield* Ref.get(subscriber3Events);

  console.log(`\n[SUMMARY]`);
  console.log(`  Subscriber 1 received: ${events1.length} events`);
  console.log(`  Subscriber 2 received: ${events2.length} events`);
  console.log(`  Subscriber 3 received: ${events3.length} events`);
});

Effect.runPromise(program);

This pattern:

  1. Creates PubSub for event distribution
  2. Multiple subscribers listen independently
  3. Publisher broadcasts events to all
  4. Each subscriber processes at own pace

Rationale:

When multiple fibers need to react to the same events, use PubSub:

  • Publisher sends events once
  • Subscribers each receive a copy
  • Decoupled: Publisher doesn't know about subscribers
  • Fan-out: One event → multiple independent handlers

PubSub variants: bounded (backpressure), unbounded, sliding.


Event distribution without PubSub creates coupling:

  • Direct references: Publisher calls subscribers directly (tight coupling)
  • Ordering issues: Publisher blocks on slowest subscriber
  • Scalability: Adding subscribers slows down publisher
  • Testing: Hard to mock multiple subscribers

PubSub enables:

  • Loose coupling: Publishers emit, subscribers listen independently
  • Parallel delivery: All subscribers notified simultaneously
  • Scalability: Add subscribers without affecting publisher
  • Testing: Mock single PubSub rather than all subscribers

Real-world example: System state changes

  • Direct: StateManager calls UserNotifier, AuditLogger, MetricsCollector (tight coupling)
  • PubSub: StateManager publishes StateChanged event; subscribers listen independently


Process a Collection in Parallel with Effect.forEach

Rule: Use Effect.forEach with the concurrency option to process a collection in parallel with a fixed limit.

Good Example:

Imagine you have a list of 100 user IDs and you need to fetch the data for each one. Effect.forEach with a concurrency of 10 will process them in controlled parallel batches.

import { Clock, Effect } from "effect";

// Mock function to simulate fetching a user by ID
const fetchUserById = (id: number) =>
  Effect.gen(function* () {
    yield* Effect.logInfo(`Fetching user ${id}...`);
    yield* Effect.sleep("1 second"); // Simulate network delay
    return { id, name: `User ${id}`, email: `user${id}@example.com` };
  });

const userIds = Array.from({ length: 10 }, (_, i) => i + 1);

// Process the entire array, but only run 5 fetches at a time.
const program = Effect.gen(function* () {
  yield* Effect.logInfo("Starting parallel processing...");

  const startTime = yield* Clock.currentTimeMillis;
  const users = yield* Effect.forEach(userIds, fetchUserById, {
    concurrency: 5, // Limit to 5 concurrent operations
  });
  const endTime = yield* Clock.currentTimeMillis;

  yield* Effect.logInfo(
    `Processed ${users.length} users in ${endTime - startTime}ms`
  );
  yield* Effect.logInfo(
    `First few users: ${JSON.stringify(users.slice(0, 3), null, 2)}`
  );

  return users;
});

// The result will be an array of all user objects.
// The total time will be much less than running them sequentially.
Effect.runPromise(program);

Anti-Pattern:

The anti-pattern is using Effect.all to process a large or dynamically-sized collection. This can lead to unpredictable and potentially catastrophic resource consumption.

import { Effect } from "effect";
import { userIds, fetchUserById } from "./somewhere"; // From previous example

// ❌ DANGEROUS: This will attempt to start 100 concurrent network requests.
// If userIds had 10,000 items, this could crash your application or get you blocked by an API.
const program = Effect.all(userIds.map(fetchUserById));

Rationale:

To process an iterable (like an array) of items concurrently, use Effect.forEach. To avoid overwhelming systems, always specify the { concurrency: number } option to limit how many effects run at the same time.


Running Effect.all on a large array of tasks is dangerous. If you have 1,000 items, it will try to start 1,000 concurrent fibers at once, which can exhaust memory, overwhelm your CPU, or hit API rate limits.

Effect.forEach with a concurrency limit solves this problem elegantly. It acts as a concurrent processing pool. It will start processing items up to your specified limit (e.g., 10 at a time). As soon as one task finishes, it will pick up the next available item from the list, ensuring that no more than 10 tasks are ever running simultaneously. This provides massive performance gains over sequential processing while maintaining stability and control.



Concurrency Pattern 6: Race and Timeout Competing Effects

Rule: Use race to compete effects and timeout to enforce deadlines, enabling cancellation when operations exceed time limits or complete.

Good Example:

This example demonstrates racing competing effects and handling timeouts.

import { Effect, Fiber } from "effect";

interface DataSource {
  readonly name: string;
  readonly latencyMs: number;
}

// Simulate fetching from different sources
const fetchFromSource = (source: DataSource): Effect.Effect<string> =>
  Effect.gen(function* () {
    yield* Effect.log(
      `[${source.name}] Starting fetch (latency: ${source.latencyMs}ms)`
    );

    yield* Effect.sleep(`${source.latencyMs} millis`);

    const result = `Data from ${source.name}`;

    yield* Effect.log(`[${source.name}] ✓ Completed`);

    return result;
  });

// Main: demonstrate race patterns
const program = Effect.gen(function* () {
  console.log(`\n[RACE] Competing effects with race and timeout\n`);

  // Example 1: Simple race (fastest wins)
  console.log(`[1] Racing 3 data sources:\n`);

  const sources: DataSource[] = [
    { name: "Primary DC", latencyMs: 200 },
    { name: "Backup DC", latencyMs: 150 },
    { name: "Cache", latencyMs: 50 },
  ];

  const raceResult = yield* Effect.race(
    fetchFromSource(sources[0]),
    Effect.race(fetchFromSource(sources[1]), fetchFromSource(sources[2]))
  );

  console.log(`\nWinner: ${raceResult}\n`);

  // Example 2: Timeout - succeed within deadline
  console.log(`[2] Timeout with fast operation:\n`);

  const fastOp = fetchFromSource({ name: "Fast Op", latencyMs: 100 }).pipe(
    Effect.timeout("500 millis")
  );

  const fastResult = yield* fastOp;

  console.log(`✓ Completed within timeout: ${fastResult}\n`);

  // Example 3: Timeout - exceed deadline
  console.log(`[3] Timeout with slow operation:\n`);

  const slowOp = fetchFromSource({ name: "Slow Op", latencyMs: 2000 }).pipe(
    Effect.timeout("500 millis"),
    Effect.either
  );

  const timeoutResult = yield* slowOp;

  if (timeoutResult._tag === "Left") {
    console.log(`✗ Operation timed out after 500ms\n`);
  }

  // Example 4: Race with timeout fallback
  console.log(`[4] Race with fallback on timeout:\n`);

  const primary = fetchFromSource({ name: "Primary", latencyMs: 300 });

  const fallback = fetchFromSource({ name: "Fallback", latencyMs: 100 });

  const raceWithFallback = primary.pipe(
    Effect.timeout("150 millis"),
    Effect.catchAll(() => {
      yield* Effect.log(`[PRIMARY] Timed out, using fallback`);

      return fallback;
    })
  );

  const fallbackResult = yield* raceWithFallback;

  console.log(`Result: ${fallbackResult}\n`);

  // Example 5: Race all (collect all winners)
  console.log(`[5] Race all - multiple sources:\n`);

  const raceAllResult = yield* Effect.raceAll(
    sources.map((s) =>
      fetchFromSource(s).pipe(
        Effect.map((data) => ({ source: s.name, data }))
      )
    )
  );

  console.log(`First to complete: ${raceAllResult.source}\n`);
});

Effect.runPromise(program);

Rationale:

Race and timeout coordinate competing effects:

  • race: Multiple effects compete, first to succeed wins
  • timeout: Effect fails if not completed in time
  • raceAll: Race multiple effects, collect winners
  • timeoutFail: Fail with specific error on timeout

Pattern: Effect.race(effect1, effect2) or effect.pipe(Effect.timeout(duration))


Without race/timeout, competing effects create issues:

  • Deadlocks: Waiting for all to complete unnecessarily
  • Hanging requests: No deadline enforcement
  • Wasted resources: Slow operations continue indefinitely
  • No fallback: Can't switch to alternative on timeout

Race/timeout enable:

  • Fastest-wins: Take first success
  • Deadline enforcement: Fail after time limit
  • Resource cleanup: Cancel slower operations
  • Fallback patterns: Alternative if primary times out

Real-world example: Multi-datacenter request

  • Without race: Wait for slowest response
  • With race: Get response from fastest datacenter


Concurrency Pattern 1: Coordinate Async Operations with Deferred

Rule: Use Deferred for one-time async coordination between fibers, enabling multiple consumers to wait for a single producer's result.

Good Example:

This example demonstrates a service startup pattern where multiple workers wait for initialization to complete before starting processing.

import { Effect, Deferred, Fiber } from "effect";

interface ServiceConfig {
  readonly name: string;
  readonly port: number;
}

interface Service {
  readonly name: string;
  readonly isReady: Deferred.Deferred<void>;
}

// Simulate a service that takes time to initialize
const createService = (config: ServiceConfig): Effect.Effect<Service> =>
  Effect.gen(function* () {
    const isReady = yield* Deferred.make<void>();

    return { name: config.name, isReady };
  });

// Initialize the service (runs in background)
const initializeService = (service: Service): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[${service.name}] Starting initialization...`);

    // Simulate initialization work
    yield* Effect.sleep("1 second");

    yield* Effect.log(`[${service.name}] Initialization complete`);

    // Signal that service is ready
    yield* Deferred.succeed(service.isReady, undefined);
  });

// A worker that waits for service to be ready before starting
const createWorker = (
  id: number,
  services: Service[]
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[Worker ${id}] Starting, waiting for services...`);

    // Wait for all services to be ready
    yield* Effect.all(
      services.map((service) =>
        Deferred.await(service.isReady).pipe(
          Effect.tapError((error) =>
            Effect.log(
              `[Worker ${id}] Error waiting for ${service.name}: ${error}`
            )
          )
        )
      )
    );

    yield* Effect.log(`[Worker ${id}] All services ready, starting work`);

    // Simulate worker processing
    for (let i = 0; i < 3; i++) {
      yield* Effect.sleep("500 millis");
      yield* Effect.log(`[Worker ${id}] Processing task ${i + 1}`);
    }

    yield* Effect.log(`[Worker ${id}] Complete`);
  });

// Main program
const program = Effect.gen(function* () {
  // Create services
  const apiService = yield* createService({ name: "API", port: 3000 });
  const dbService = yield* createService({ name: "Database", port: 5432 });
  const cacheService = yield* createService({ name: "Cache", port: 6379 });

  const services = [apiService, dbService, cacheService];

  // Start initializing services in background
  const initFibers = yield* Effect.all(
    services.map((service) => initializeService(service).pipe(Effect.fork))
  );

  // Start workers that wait for services
  const workerFibers = yield* Effect.all(
    [1, 2, 3].map((id) => createWorker(id, services).pipe(Effect.fork))
  );

  // Wait for all workers to complete
  yield* Effect.all(workerFibers.map((fiber) => Fiber.join(fiber)));

  // Cancel initialization fibers (they're done anyway)
  yield* Effect.all(initFibers.map((fiber) => Fiber.interrupt(fiber)));

  yield* Effect.log(`\n[MAIN] All workers completed`);
});

Effect.runPromise(program);

This pattern:

  1. Creates Deferred instances for each service's readiness
  2. Starts initialization in background fibers
  3. Workers wait for all services via Deferred.await
  4. Service signals completion via Deferred.succeed
  5. Workers resume when all dependencies ready

Rationale:

When you need multiple fibers to wait for a single async event (e.g., service initialization, data availability, external signal), use Deferred. A Deferred is a one-shot promise that exactly one fiber completes, and many fibers can wait for. This avoids polling and provides clean async signaling.


Many concurrent systems need to coordinate on events:

  • Service initialization: Wait for all services to start before accepting requests
  • Data availability: Wait for initial data load before processing
  • External events: Wait for webhook, signal, or message
  • Startup gates: All workers wait for leader to signal start

Without Deferred:

  • Polling wastes CPU (check repeatedly)
  • Callbacks become complex (multiple consumers)
  • No clean semantics for "wait for this one thing"
  • Error propagation unclear

With Deferred:

  • Non-blocking wait (fiber suspends)
  • One fiber produces, many consume
  • Clear completion or failure
  • Efficient wakeup when ready


Concurrency Pattern 4: Distribute Work with Queue

Rule: Use Queue to distribute work between producers and consumers with built-in backpressure, enabling flexible pipeline coordination.

Good Example:

This example demonstrates a producer-consumer pipeline with a bounded queue for buffering work items.

import { Effect, Queue, Fiber, Ref } from "effect";

interface WorkItem {
  readonly id: number;
  readonly data: string;
  readonly timestamp: number;
}

interface WorkResult {
  readonly itemId: number;
  readonly processed: string;
  readonly duration: number;
}

// Producer: generates work items
const producer = (
  queue: Queue.Enqueue<WorkItem>,
  count: number
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[PRODUCER] Starting, generating ${count} items`);

    for (let i = 1; i <= count; i++) {
      const item: WorkItem = {
        id: i,
        data: `Item ${i}`,
        timestamp: Date.now(),
      };

      const start = Date.now();

      // Enqueue - will block if queue is full (backpressure)
      yield* Queue.offer(queue, item);

      const delay = Date.now() - start;

      if (delay > 0) {
        yield* Effect.log(
          `[PRODUCER] Item ${i} enqueued (waited ${delay}ms due to backpressure)`
        );
      } else {
        yield* Effect.log(`[PRODUCER] Item ${i} enqueued`);
      }

      // Simulate work
      yield* Effect.sleep("50 millis");
    }

    yield* Effect.log(`[PRODUCER] ✓ All items enqueued`);
  });

// Consumer: processes work items
const consumer = (
  queue: Queue.Dequeue<WorkItem>,
  consumerId: number,
  results: Ref.Ref<WorkResult[]>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[CONSUMER ${consumerId}] Starting`);

    while (true) {
      // Dequeue - will block if queue is empty
      const item = yield* Queue.take(queue).pipe(Effect.either);

      if (item._tag === "Left") {
        yield* Effect.log(`[CONSUMER ${consumerId}] Queue closed, stopping`);
        return;
      }

      const workItem = item.right;
      const startTime = Date.now();

      yield* Effect.log(
        `[CONSUMER ${consumerId}] Processing ${workItem.data}`
      );

      // Simulate processing
      yield* Effect.sleep("150 millis");

      const duration = Date.now() - startTime;
      const result: WorkResult = {
        itemId: workItem.id,
        processed: `${workItem.data} [processed by consumer ${consumerId}]`,
        duration,
      };

      yield* Ref.update(results, (rs) => [...rs, result]);

      yield* Effect.log(
        `[CONSUMER ${consumerId}] ✓ Completed ${workItem.data} in ${duration}ms`
      );
    }
  });

// Main: coordinate producer and consumers
const program = Effect.gen(function* () {
  // Create bounded queue with capacity 3
  const queue = yield* Queue.bounded<WorkItem>(3);
  const results = yield* Ref.make<WorkResult[]>([]);

  console.log(`\n[MAIN] Starting producer-consumer pipeline with queue size 3\n`);

  // Spawn producer
  const producerFiber = yield* producer(queue, 10).pipe(Effect.fork);

  // Spawn 2 consumers
  const consumer1 = yield* consumer(queue, 1, results).pipe(Effect.fork);
  const consumer2 = yield* consumer(queue, 2, results).pipe(Effect.fork);

  // Wait for producer to finish
  yield* Fiber.join(producerFiber);

  // Give consumers time to finish
  yield* Effect.sleep("3 seconds");

  // Close queue and wait for consumers
  yield* Queue.shutdown(queue);
  yield* Fiber.join(consumer1);
  yield* Fiber.join(consumer2);

  // Summary
  const allResults = yield* Ref.get(results);
  const totalDuration = allResults.reduce((sum, r) => sum + r.duration, 0);

  console.log(`\n[SUMMARY]`);
  console.log(`  Items processed: ${allResults.length}`);
  console.log(
    `  Avg processing time: ${Math.round(totalDuration / allResults.length)}ms`
  );
});

Effect.runPromise(program);

This pattern:

  1. Creates bounded queue with capacity (backpressure point)
  2. Producer enqueues items (blocks if full)
  3. Consumers dequeue and process (each at own pace)
  4. Queue coordinates flow automatically

Rationale:

When multiple fibers need to coordinate work asynchronously, use Queue:

  • Producers add items (enqueue)
  • Consumers remove and process items (dequeue)
  • Backpressure built-in: producers wait if queue is full
  • Decoupling: Producers don't block on consumer speed

Queue variants: bounded (size limit), unbounded (unlimited), dropping (discards on overflow).


Direct producer-consumer coordination creates problems:

  • Blocking: Producer waits for consumer to finish
  • Tight coupling: Producer depends on consumer speed
  • Memory pressure: Fast producer floods memory with results
  • No backpressure: Downstream overload propagates upstream

Queue solves these:

  • Asynchronous: Producer enqueues and continues
  • Decoupled: Producer/consumer independent
  • Backpressure: Producer waits when queue full (natural flow control)
  • Throughput: Consumer processes at own pace

Real-world example: API request handler + database writer

  • Direct: Handler waits for DB write (blocking, slow requests)
  • Queue: Handler enqueues write and returns immediately (responsive)


🟠 Advanced Patterns

Add Caching by Wrapping a Layer

Rule: Use a wrapping Layer to add cross-cutting concerns like caching to a service without altering its original implementation.

Good Example:

We have a WeatherService that makes slow API calls. We create a WeatherService.cached wrapper layer that adds an in-memory cache using a Ref and a Map.

import { Effect, Layer, Ref } from "effect";

// 1. Define the service interface
class WeatherService extends Effect.Service<WeatherService>()(
  "WeatherService",
  {
    sync: () => ({
      getForecast: (city: string) => Effect.succeed(`Sunny in ${city}`),
    }),
  }
) {}

// 2. The "Live" implementation that is slow
const WeatherServiceLive = Layer.succeed(
  WeatherService,
  WeatherService.of({
    _tag: "WeatherService",
    getForecast: (city) =>
      Effect.succeed(`Sunny in ${city}`).pipe(
        Effect.delay("2 seconds"),
        Effect.tap(() => Effect.log(`Fetched live forecast for ${city}`))
      ),
  })
);

// 3. The Caching Wrapper Layer
const WeatherServiceCached = Layer.effect(
  WeatherService,
  Effect.gen(function* () {
    // It REQUIRES the original WeatherService
    const underlyingService = yield* WeatherService;
    const cache = yield* Ref.make(new Map<string, string>());

    return WeatherService.of({
      _tag: "WeatherService",
      getForecast: (city) =>
        Ref.get(cache).pipe(
          Effect.flatMap((map) =>
            map.has(city)
              ? Effect.log(`Cache HIT for ${city}`).pipe(
                  Effect.as(map.get(city)!)
                )
              : Effect.log(`Cache MISS for ${city}`).pipe(
                  Effect.flatMap(() => underlyingService.getForecast(city)),
                  Effect.tap((forecast) =>
                    Ref.update(cache, (map) => map.set(city, forecast))
                  )
                )
          )
        ),
    });
  })
);

// 4. Compose the final layer. The wrapper is provided with the live implementation.
const AppLayer = Layer.provide(WeatherServiceCached, WeatherServiceLive);

// 5. The application logic
const program = Effect.gen(function* () {
  const weather = yield* WeatherService;
  yield* weather.getForecast("London"); // First call is slow (MISS)
  yield* weather.getForecast("London"); // Second call is instant (HIT)
});

Effect.runPromise(Effect.provide(program, AppLayer));

Anti-Pattern:

Modifying the original service implementation to include caching logic directly. This violates the Single Responsibility Principle by mixing the core logic of fetching weather with the cross-cutting concern of caching.

// ❌ WRONG: The service is now responsible for both its logic and its caching strategy.
const WeatherServiceWithInlineCache = Layer.effect(
  WeatherService,
  Effect.gen(function* () {
    const cache = yield* Ref.make(new Map<string, string>());
    return WeatherService.of({
      getForecast: (city) => {
        // ...caching logic mixed directly with fetching logic...
        return Effect.succeed("...");
      },
    });
  })
);

Rationale:

To add cross-cutting concerns like caching to a service, create a "wrapper" Layer. This is a layer that takes the original service's Layer as input (as a dependency) and returns a new Layer. The new layer provides the same service interface but wraps the original methods with additional logic (e.g., checking a cache before calling the original method).


You often want to add functionality like caching, logging, or metrics to a service without polluting its core business logic. The wrapper layer pattern is a clean way to achieve this.

By creating a layer that requires the original service, you can get an instance of it from the context, and then provide a new implementation of that same service that calls the original.

This approach is powerful because:

  • It's Non-Invasive: The original service (DatabaseLive) remains completely unchanged.
  • It's Composable: You can apply multiple wrappers. You could wrap a database layer with a caching layer, then wrap that with a metrics layer.
  • It's Explicit: The composition is clearly defined at the application's top level where you build your final AppLayer.


State Management Pattern 1: Synchronized Reference with SynchronizedRef

Rule: Use SynchronizedRef for thread-safe mutable state that must be updated consistently across concurrent operations, with atomic modifications.

Good Example:

This example demonstrates synchronized reference patterns.

import { Effect, Ref, Fiber, Deferred } from "effect";

interface Counter {
  readonly value: number;
  readonly updates: number;
}

interface Account {
  readonly balance: number;
  readonly transactions: string[];
}

const program = Effect.gen(function* () {
  console.log(
    `\n[SYNCHRONIZED REFERENCES] Concurrent state management\n`
  );

  // Example 1: Basic counter with atomic updates
  console.log(`[1] Atomic counter increments:\n`);

  const counter = yield* Ref.make<Counter>({
    value: 0,
    updates: 0,
  });

  // Simulate 5 concurrent increments
  const incrementTasks = Array.from({ length: 5 }, (_, i) =>
    Effect.gen(function* () {
      for (let j = 0; j < 20; j++) {
        yield* Ref.modify(counter, (current) => [
          undefined,
          {
            value: current.value + 1,
            updates: current.updates + 1,
          },
        ]);

        if (j === 0 || j === 19) {
          yield* Effect.log(
            `[FIBER ${i}] Increment ${j === 0 ? "start" : "end"}`
          );
        }
      }
    })
  );

  // Run concurrently
  yield* Effect.all(incrementTasks, { concurrency: "unbounded" });

  const finalCounter = yield* Ref.get(counter);

  yield* Effect.log(
    `[RESULT] Counter: ${finalCounter.value} (expected 100)`
  );
  yield* Effect.log(
    `[RESULT] Updates: ${finalCounter.updates} (expected 100)\n`
  );

  // Example 2: Bank account with transaction isolation
  console.log(`[2] Account with atomic transfers:\n`);

  const account = yield* Ref.make<Account>({
    balance: 1000,
    transactions: [],
  });

  const transfer = (amount: number, description: string) =>
    Ref.modify(account, (current) => {
      if (current.balance < amount) {
        // Insufficient funds, don't modify
        return [
          { success: false, reason: "insufficient-funds" },
          current, // Unchanged
        ];
      }

      // Atomic: deduct + record transaction
      return [
        { success: true, reason: "transferred" },
        {
          balance: current.balance - amount,
          transactions: [
            ...current.transactions,
            `${description}: -$${amount}`,
          ],
        },
      ];
    });

  // Test transfer
  const t1 = yield* transfer(100, "Coffee");

  yield* Effect.log(`[TRANSFER 1] ${t1.success ? "✓" : "✗"} ${t1.reason}`);

  const t2 = yield* transfer(2000, "Electronics");

  yield* Effect.log(`[TRANSFER 2] ${t2.success ? "✓" : "✗"} ${t2.reason}`);

  const t3 = yield* transfer(200, "Groceries");

  yield* Effect.log(`[TRANSFER 3] ${t3.success ? "✓" : "✗"} ${t3.reason}\n`);

  // Example 3: Concurrent reads don't block writes
  console.log(`[3] Concurrent reads and writes:\n`);

  const state = yield* Ref.make({ value: 0, readers: 0 });

  const read = Effect.gen(function* () {
    const snapshot = yield* Ref.get(state);

    yield* Effect.log(
      `[READ] Got value: ${snapshot.value}`
    );

    return snapshot.value;
  });

  const write = (newValue: number) =>
    Ref.set(state, { value: newValue, readers: 0 });

  // Concurrent operations
  const mixed = Effect.all(
    [
      read,
      write(10),
      read,
      write(20),
      read,
    ],
    { concurrency: "unbounded" }
  );

  yield* mixed;

  // Example 4: Compare-and-set pattern (retry on failure)
  console.log(`\n[4] Compare-and-set (optimistic updates):\n`);

  const versionedState = yield* Ref.make({ version: 0, data: "initial" });

  const updateWithVersion = (newData: string) =>
    Effect.gen(function* () {
      let retries = 0;

      while (retries < 3) {
        const current = yield* Ref.get(versionedState);

        // Try to update (check-and-set)
        const result = yield* Ref.modify(versionedState, (s) => {
          if (s.version === current.version) {
            // No concurrent update, proceed
            return [
              { success: true },
              {
                version: s.version + 1,
                data: newData,
              },
            ];
          }

          // Version changed, conflict
          return [{ success: false }, s];
        });

        if (result.success) {
          yield* Effect.log(
            `[CAS] Updated on attempt ${retries + 1}`
          );

          return true;
        }

        retries++;

        yield* Effect.log(
          `[CAS] Conflict detected, retrying (attempt ${retries + 1})`
        );
      }

      return false;
    });

  const casResult = yield* updateWithVersion("updated-data");

  yield* Effect.log(`[CAS] Success: ${casResult}\n`);

  // Example 5: State with subscriptions (notify on change)
  console.log(`[5] State changes with notification:\n`);

  interface Notification {
    oldValue: unknown;
    newValue: unknown;
    timestamp: Date;
  }

  const observedState = yield* Ref.make<{ value: number; lastChange: Date }>({
    value: 0,
    lastChange: new Date(),
  });

  const updateAndNotify = (newValue: number) =>
    Ref.modify(observedState, (current) => {
      const notification: Notification = {
        oldValue: current.value,
        newValue,
        timestamp: new Date(),
      };

      yield* Effect.log(
        `[NOTIFY] ${current.value} → ${newValue} at ${notification.timestamp.toISOString()}`
      );

      return [
        notification,
        {
          value: newValue,
          lastChange: notification.timestamp,
        },
      ];
    });

  // Trigger changes
  for (const val of [5, 10, 15]) {
    yield* updateAndNotify(val);
  }

  // Example 6: Atomic batch updates
  console.log(`\n[6] Batch atomic updates:\n`);

  interface BatchState {
    items: string[];
    locked: boolean;
    version: number;
  }

  const batchState = yield* Ref.make<BatchState>({
    items: [],
    locked: false,
    version: 0,
  });

  const addItems = (newItems: string[]) =>
    Ref.modify(batchState, (current) => {
      // All items added atomically
      return [
        { added: newItems.length },
        {
          items: [...current.items, ...newItems],
          locked: false,
          version: current.version + 1,
        },
      ];
    });

  const batch1 = yield* addItems(["item1", "item2", "item3"]);

  yield* Effect.log(
    `[BATCH 1] Added ${batch1.added} items`
  );

  const batch2 = yield* addItems(["item4", "item5"]);

  yield* Effect.log(
    `[BATCH 2] Added ${batch2.added} items`
  );

  const finalBatch = yield* Ref.get(batchState);

  yield* Effect.log(
    `[RESULT] Total items: ${finalBatch.items.length}, Version: ${finalBatch.version}`
  );
});

Effect.runPromise(program);

Rationale:

Synchronized references manage shared state safely:

  • Atomic updates: All-or-nothing modifications
  • Consistent reads: Snapshot consistency
  • Lock-free optimism: Try updates, retry on failure
  • Compare-and-set: Atomic check-and-update
  • Transaction safety: Multiple operations as one

Pattern: Ref.make(), Ref.modify(), Ref.set(), Ref.get()


Shared mutable state without synchronization causes problems:

Problem 1: Data races

  • Fiber A reads counter (value: 5)
  • Fiber B reads counter (value: 5)
  • Fiber A writes counter + 1 (value: 6)
  • Fiber B writes counter + 1 (value: 6)
  • Expected: 7, Got: 6 (lost update)

Problem 2: Inconsistent snapshots

  • Transaction reads user.balance (100)
  • User spent money elsewhere
  • Transaction reads user.balance again (90)
  • Now inconsistent within same transaction

Problem 3: Race conditions

  • Check inventory (10 items)
  • Check passes
  • Before purchase, inventory goes to 0 (race)
  • Purchase fails, user frustrated

Problem 4: Deadlocks

  • Fiber A locks state, tries to acquire another
  • Fiber B holds that state, tries to acquire first
  • Both stuck forever

Solutions:

Atomic operations:

  • Read and modify as single operation
  • No intermediate states visible
  • No race window
  • Guaranteed consistency

Compare-and-set:

  • "If value is X, change to Y" (atomic)
  • Fails if another fiber changed it
  • Retry automatically
  • No locks needed

Snapshot isolation:

  • Read complete snapshot
  • All operations see consistent view
  • Modifications build on snapshot
  • Merge changes safely


Manage Resource Lifecycles with Scope

Rule: Use Scope for fine-grained, manual control over resource lifecycles and cleanup guarantees.

Good Example:

This example shows how to acquire a resource (like a file handle), use it, and have Scope guarantee its release.

import { Effect, Scope } from "effect";

// Simulate acquiring and releasing a resource
const acquireFile = Effect.log("File opened").pipe(
  Effect.as({ write: (data: string) => Effect.log(`Wrote: ${data}`) })
);
const releaseFile = Effect.log("File closed.");

// Create a "scoped" effect. This effect, when used, will acquire the
// resource and register its release action with the current scope.
const scopedFile = Effect.acquireRelease(acquireFile, () => releaseFile);

// The main program that uses the scoped resource
const program = Effect.gen(function* () {
  // Effect.scoped "uses" the resource. It runs the acquire effect,
  // provides the resource to the inner effect, and ensures the
  // release effect is run when this block completes.
  const file = yield* Effect.scoped(scopedFile);

  yield* file.write("hello");
  yield* file.write("world");

  // The file will be automatically closed here.
});

Effect.runPromise(program);
/*
Output:
File opened
Wrote: hello
Wrote: world
File closed
*/

Anti-Pattern:

Manual resource management without the guarantees of Scope. This is brittle because if an error occurs after the resource is acquired but before it's released, the release logic is never executed.

import { Effect } from "effect";
import { acquireFile, releaseFile } from "./somewhere"; // From previous example

// ❌ WRONG: This will leak the resource if an error happens.
const program = Effect.gen(function* () {
  const file = yield* acquireFile;

  // If this operation fails...
  yield* Effect.fail("Something went wrong!");

  // ...this line will never be reached, and the file will never be closed.
  yield* releaseFile;
});

Rationale:

A Scope is a context that collects finalizers (cleanup effects). When you need fine-grained control over resource lifecycles, you can work with Scope directly. The most common pattern is to create a resource within a scope using Effect.acquireRelease and then use it via Effect.scoped.


Scope is the fundamental building block for all resource management in Effect. While higher-level APIs like Layer.scoped and Stream are often sufficient, understanding Scope is key to advanced use cases.

A Scope guarantees that any finalizers added to it will be executed when the scope is closed, regardless of whether the associated computation succeeds, fails, or is interrupted. This provides a rock-solid guarantee against resource leaks.

This is especially critical in concurrent applications. When a parent fiber is interrupted, it closes its scope, which in turn automatically interrupts all its child fibers and runs all their finalizers in a structured, predictable order.



Run Background Tasks with Effect.fork

Rule: Use Effect.fork to start a non-blocking background process and manage its lifecycle via its Fiber.

Good Example:

This program forks a background process that logs a "tick" every second. The main process does its own work for 5 seconds and then explicitly interrupts the background logger before exiting.

import { Effect, Fiber } from "effect";

// A long-running effect that logs a message every second, forever
// Effect.forever creates an infinite loop that repeats the effect
// This simulates a background service like a health check or monitoring task
const tickingClock = Effect.log("tick").pipe(
  Effect.delay("1 second"), // Wait 1 second between ticks
  Effect.forever // Repeat indefinitely - this creates an infinite effect
);

const program = Effect.gen(function* () {
  yield* Effect.log("Forking the ticking clock into the background.");

  // Start the clock, but don't wait for it.
  // Effect.fork creates a new fiber that runs concurrently with the main program
  // The main fiber continues immediately without waiting for the background task
  // This is essential for non-blocking background operations
  const clockFiber = yield* Effect.fork(tickingClock);

  // At this point, we have two fibers running:
  // 1. The main fiber (this program)
  // 2. The background clock fiber (ticking every second)

  yield* Effect.log("Main process is now doing other work for 5 seconds...");

  // Simulate the main application doing work
  // While this sleep happens, the background clock continues ticking
  // This demonstrates true concurrency - both fibers run simultaneously
  yield* Effect.sleep("5 seconds");

  yield* Effect.log("Main process is done. Interrupting the clock fiber.");

  // Stop the background process.
  // Fiber.interrupt sends an interruption signal to the fiber
  // This allows the fiber to perform cleanup operations before terminating
  // Without this, the background task would continue running indefinitely
  yield* Fiber.interrupt(clockFiber);

  // Important: Always clean up background fibers to prevent resource leaks
  // In a real application, you might want to:
  // 1. Use Fiber.join instead of interrupt to wait for graceful completion
  // 2. Handle interruption signals within the background task
  // 3. Implement proper shutdown procedures

  yield* Effect.log("Program finished.");

  // Key concepts demonstrated:
  // 1. Fork creates concurrent fibers without blocking
  // 2. Background tasks run independently of the main program
  // 3. Fiber interruption provides controlled shutdown
  // 4. Multiple fibers can run simultaneously on the same thread pool
});

// This example shows how to:
// - Run background tasks that don't block the main program
// - Manage fiber lifecycles (create, run, interrupt)
// - Coordinate between multiple concurrent operations
// - Properly clean up resources when shutting down
Effect.runPromise(program);

Anti-Pattern:

The anti-pattern is using Effect.fork when you immediately need the result of the computation. This is an overly complicated and less readable way of just running the effect directly.

import { Effect, Fiber } from "effect";

const someEffect = Effect.succeed(42);

// ❌ WRONG: This is unnecessarily complex.
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(someEffect);
  // You immediately wait for the result, defeating the purpose of forking.
  const result = yield* Fiber.join(fiber);
  return result;
});

// ✅ CORRECT: Just run the effect directly if you need its result right away.
const simplerProgram = Effect.gen(function* () {
  const result = yield* someEffect;
  return result;
});

Rationale:

To start an Effect in the background without blocking the current execution flow, use Effect.fork. This immediately returns a Fiber, which is a handle to the running computation that you can use to manage its lifecycle (e.g., interrupt it or wait for its result).


Unlike Effect.all or a direct yield*, which wait for the computation to complete, Effect.fork is a "fire and forget" operation. It starts the effect on a new, concurrent fiber and immediately returns control to the parent fiber.

This is essential for managing long-running background tasks like:

  • A web server listener.
  • A message queue consumer.
  • A periodic cache cleanup job.

The returned Fiber object is your remote control for the background task. You can use Fiber.interrupt to safely stop it (ensuring all its finalizers are run) or Fiber.join to wait for it to complete at some later point.



Execute Long-Running Apps with Effect.runFork

Rule: Use Effect.runFork to launch a long-running application as a manageable, detached fiber.

Good Example:

This example starts a simple "server" that runs forever. We use runFork to launch it and then use the returned Fiber to shut it down gracefully after 5 seconds.

import { Effect, Fiber } from "effect";

// A server that listens for requests forever
const server = Effect.log("Server received a request.").pipe(
  Effect.delay("1 second"),
  Effect.forever
);

Effect.runSync(Effect.log("Starting server..."));

// Launch the server as a detached, top-level fiber
const appFiber = Effect.runFork(server);

// In a real app, you would listen for OS signals.
// Here, we simulate a shutdown signal after 5 seconds.
setTimeout(() => {
  const shutdownProgram = Effect.gen(function* () {
    yield* Effect.log("Shutdown signal received. Interrupting server fiber...");
    // This ensures all cleanup logic within the server effect would run.
    yield* Fiber.interrupt(appFiber);
  });
  Effect.runPromise(shutdownProgram);
}, 5000);

Anti-Pattern:

Using runFork when you immediately need the result of the effect. If you call runFork and then immediately call Fiber.join on the result, you have simply implemented a more complex and less direct version of runPromise.

import { Effect, Fiber } from "effect";

const someEffect = Effect.succeed(42);

// ❌ WRONG: This is just a complicated way to write `Effect.runPromise(someEffect)`
const resultPromise = Effect.runFork(someEffect).pipe(
  Fiber.join,
  Effect.runPromise
);

Rationale:

To launch a long-running application (like a server or daemon) as a non-blocking, top-level process, use Effect.runFork. It immediately returns a Fiber representing your running application, which you can use to manage its lifecycle.


Unlike Effect.runPromise, which waits for the effect to complete, Effect.runFork starts the effect and immediately returns a Fiber. This is the ideal way to run an application that is meant to run forever, because it gives you a handle to the process.

The most critical use case for this is enabling graceful shutdown. You can start your application with runFork, and then set up listeners for OS signals (like SIGINT for Ctrl+C). When a shutdown signal is received, you call Fiber.interrupt on the application fiber, which guarantees that all finalizers (like closing database connections) are run before the process exits.



State Management Pattern 2: Observable State with SubscriptionRef

Rule: Combine Ref with PubSub to create observable state where changes trigger notifications, enabling reactive state management.

Good Example:

This example demonstrates observable state patterns.

import { Effect, Ref, PubSub, Stream } from "effect";

interface StateChange<T> {
  readonly previous: T;
  readonly current: T;
  readonly timestamp: Date;
  readonly reason: string;
}

interface Observable<T> {
  readonly get: () => Effect.Effect<T>;
  readonly set: (value: T, reason: string) => Effect.Effect<void>;
  readonly subscribe: () => Stream.Stream<StateChange<T>>;
  readonly modify: (f: (current: T) => T, reason: string) => Effect.Effect<void>;
}

const program = Effect.gen(function* () {
  console.log(
    `\n[OBSERVABLE STATE] Reactive state management\n`
  );

  // Create observable
  const createObservable = <T,>(initialValue: T): Effect.Effect<Observable<T>> =>
    Effect.gen(function* () {
      const state = yield* Ref.make(initialValue);
      const changeStream = yield* PubSub.unbounded<StateChange<T>>();

      return {
        get: () => Ref.get(state),

        set: (value: T, reason: string) =>
          Effect.gen(function* () {
            const previous = yield* Ref.get(state);

            if (previous === value) {
              return; // No change
            }

            yield* Ref.set(state, value);

            const change: StateChange<T> = {
              previous,
              current: value,
              timestamp: new Date(),
              reason,
            };

            yield* PubSub.publish(changeStream, change);
          }),

        subscribe: () =>
          PubSub.subscribe(changeStream),

        modify: (f: (current: T) => T, reason: string) =>
          Effect.gen(function* () {
            const previous = yield* Ref.get(state);
            const updated = f(previous);

            if (previous === updated) {
              return; // No change
            }

            yield* Ref.set(state, updated);

            const change: StateChange<T> = {
              previous,
              current: updated,
              timestamp: new Date(),
              reason,
            };

            yield* PubSub.publish(changeStream, change);
          }),
      };
    });

  // Example 1: Basic observable counter
  console.log(`[1] Observable counter:\n`);

  const counter = yield* createObservable(0);

  // Subscribe to changes
  const printChanges = counter.subscribe().pipe(
    Stream.tap((change) =>
      Effect.log(
        `[CHANGE] ${change.previous} → ${change.current} (${change.reason})`
      )
    ),
    Stream.take(5), // Limit to 5 changes for demo
    Stream.runDrain
  );

  // Make changes
  yield* counter.set(1, "increment");
  yield* counter.set(2, "increment");
  yield* counter.set(5, "reset");

  // Wait for changes to be processed
  yield* Effect.sleep("100 millis");

  // Example 2: Derived state (computed values)
  console.log(`\n[2] Derived state (total from items):\n`);

  interface ShoppingCart {
    readonly items: Array<{ id: string; price: number }>;
    readonly discount: number;
  }

  const cart = yield* createObservable<ShoppingCart>({
    items: [],
    discount: 0,
  });

  const computeTotal = (state: ShoppingCart): number => {
    const subtotal = state.items.reduce((sum, item) => sum + item.price, 0);
    return subtotal * (1 - state.discount);
  };

  // Create derived observable
  const total = yield* createObservable(computeTotal(yield* cart.get()));

  // Subscribe to cart changes, update total
  const updateTotalOnCartChange = cart.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        const newTotal = computeTotal(change.current);

        yield* total.set(newTotal, "recalculated-from-cart");

        yield* Effect.log(
          `[TOTAL] Recalculated: $${newTotal.toFixed(2)}`
        );
      })
    ),
    Stream.take(10),
    Stream.runDrain
  );

  // Make cart changes
  yield* cart.modify(
    (state) => ({
      ...state,
      items: [
        ...state.items,
        { id: "item1", price: 19.99 },
      ],
    }),
    "add-item"
  );

  yield* cart.modify(
    (state) => ({
      ...state,
      items: [
        ...state.items,
        { id: "item2", price: 29.99 },
      ],
    }),
    "add-item"
  );

  yield* cart.modify(
    (state) => ({
      ...state,
      discount: 0.1,
    }),
    "apply-discount"
  );

  yield* Effect.sleep("200 millis");

  // Example 3: Effect triggering on state change
  console.log(`\n[3] Effects triggered by state changes:\n`);

  type AppStatus = "idle" | "loading" | "ready" | "error";

  const appStatus = yield* createObservable<AppStatus>("idle");

  // Define effects for each status
  const handleStatusChange = appStatus.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        yield* Effect.log(
          `[STATUS] ${change.previous} → ${change.current}`
        );

        switch (change.current) {
          case "loading":
            yield* Effect.log(`[EFFECT] Starting loading animation`);
            break;

          case "ready":
            yield* Effect.log(`[EFFECT] Hiding spinner, showing content`);
            break;

          case "error":
            yield* Effect.log(`[EFFECT] Showing error message`);
            yield* Effect.log(`[TELEMETRY] Logging error event`);
            break;

          default:
            yield* Effect.log(`[EFFECT] Resetting UI`);
        }
      })
    ),
    Stream.take(6),
    Stream.runDrain
  );

  // Trigger status changes
  yield* appStatus.set("loading", "user-clicked");
  yield* appStatus.set("ready", "data-loaded");
  yield* appStatus.set("loading", "user-refreshed");
  yield* appStatus.set("error", "api-failed");

  yield* Effect.sleep("200 millis");

  // Example 4: Multi-level state aggregation
  console.log(`\n[4] Aggregated state from multiple sources:\n`);

  interface UserProfile {
    name: string;
    email: string;
    role: string;
  }

  interface AppState {
    user: UserProfile | null;
    notifications: number;
    theme: "light" | "dark";
  }

  const appState = yield* createObservable<AppState>({
    user: null,
    notifications: 0,
    theme: "light",
  });

  // Subscribe to track changes
  const trackChanges = appState.subscribe().pipe(
    Stream.tap((change) => {
      if (change.current.user && !change.previous.user) {
        return Effect.log(`[EVENT] User logged in: ${change.current.user.name}`);
      }

      if (!change.current.user && change.previous.user) {
        return Effect.log(`[EVENT] User logged out`);
      }

      if (change.current.notifications !== change.previous.notifications) {
        return Effect.log(
          `[NOTIFY] ${change.current.notifications} notifications`
        );
      }

      if (change.current.theme !== change.previous.theme) {
        return Effect.log(`[THEME] Switched to ${change.current.theme}`);
      }

      return Effect.succeed(undefined);
    }),
    Stream.take(10),
    Stream.runDrain
  );

  // Make changes
  yield* appState.modify(
    (state) => ({
      ...state,
      user: { name: "Alice", email: "alice@example.com", role: "admin" },
    }),
    "user-login"
  );

  yield* appState.modify(
    (state) => ({
      ...state,
      notifications: 5,
    }),
    "new-notifications"
  );

  yield* appState.modify(
    (state) => ({
      ...state,
      theme: "dark",
    }),
    "user-preference"
  );

  yield* Effect.sleep("200 millis");

  // Example 5: State snapshot and history
  console.log(`\n[5] State history tracking:\n`);

  interface HistoryEntry<T> {
    value: T;
    timestamp: Date;
    reason: string;
  }

  const history = yield* Ref.make<HistoryEntry<number>[]>([]);

  const trackedCounter = yield* createObservable(0);

  const trackHistory = trackedCounter.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        yield* Ref.modify(history, (h) => [
          undefined,
          [
            ...h,
            {
              value: change.current,
              timestamp: change.timestamp,
              reason: change.reason,
            },
          ],
        ]);

        yield* Effect.log(
          `[HISTORY] Recorded: ${change.current} (${change.reason})`
        );
      })
    ),
    Stream.take(5),
    Stream.runDrain
  );

  // Make changes
  for (let i = 1; i <= 4; i++) {
    yield* trackedCounter.set(i, `step-${i}`);
  }

  yield* Effect.sleep("200 millis");

  // Print history
  const hist = yield* Ref.get(history);

  yield* Effect.log(`\n[HISTORY] ${hist.length} entries:`);

  for (const entry of hist) {
    yield* Effect.log(
      `  - ${entry.value} (${entry.reason})`
    );
  }
});

Effect.runPromise(program);

Rationale:

Observable state enables reactive patterns:

  • State binding: UI binds to state, auto-updates on change
  • Subscribers: Multiple handlers notified on change
  • Event streams: Changes become event streams
  • Derived state: Compute values from state changes
  • Effect triggering: Changes trigger side effects

Pattern: Combine Ref + PubSub or custom subscription system


Passive state causes problems:

Problem 1: Stale UI

  • State changes in backend
  • UI doesn't know
  • User sees old data
  • Manual refresh required

Problem 2: Cascading updates

  • User changes form field
  • Need to update 5 other fields
  • Manual imperative code
  • Fragile, easy to miss one

Problem 3: Derived state

  • Total = sum of items
  • Manual update on each item change
  • Duplicate code everywhere
  • Bug: total not updated when items change

Problem 4: Side effects

  • User enables feature
  • Multiple things must happen
  • Analytics, notifications, API calls
  • All imperative, hard to maintain

Solutions:

Observable state:

  • State change = event
  • Subscribers notified
  • UI binds directly
  • Auto-updates

Reactive flows:

  • Define how state flows
  • newTotal = items.sum()
  • Automatic recalculation
  • No manual updates

Side effect chaining:

  • When state changes to "complete"
  • Send notification
  • Log event
  • Trigger cleanup
  • All declaratively


Implement Graceful Shutdown for Your Application

Rule: Use Effect.runFork and OS signal listeners to implement graceful shutdown for long-running applications.

Good Example:

This example creates a server with a "scoped" database connection. It uses runFork to start the server and sets up a SIGINT handler to interrupt the server fiber, which in turn guarantees the database finalizer is called.

import { Effect, Layer, Fiber, Context, Scope } from "effect";
import * as http from "http";

// 1. A service with a finalizer for cleanup
class Database extends Effect.Service<Database>()("Database", {
  effect: Effect.gen(function* () {
    yield* Effect.log("Acquiring DB connection");
    return {
      query: () => Effect.succeed("data"),
    };
  }),
}) {}

// 2. The main server logic
const server = Effect.gen(function* () {
  const db = yield* Database;

  // Create server with proper error handling
  const httpServer = yield* Effect.sync(() => {
    const server = http.createServer((_req, res) => {
      Effect.runFork(
        Effect.provide(
          db.query().pipe(Effect.map((data) => res.end(data))),
          Database.Default
        )
      );
    });
    return server;
  });

  // Add a finalizer to close the server
  yield* Effect.addFinalizer(() =>
    Effect.gen(function* () {
      httpServer.close();
      yield* Effect.log("Server closed");
    })
  );

  // Start server with error handling
  yield* Effect.async<void, Error>((resume) => {
    httpServer.once("error", (err: Error) => {
      resume(Effect.fail(new Error(`Failed to start server: ${err.message}`)));
    });

    httpServer.listen(3456, () => {
      resume(Effect.succeed(void 0));
    });
  });

  yield* Effect.log("Server started on port 3456. Press Ctrl+C to exit.");

  // For testing purposes, we'll run for a short time instead of forever
  yield* Effect.sleep("2 seconds");
  yield* Effect.log("Shutting down gracefully...");
});

// 3. Provide the layer and launch with runFork
const app = Effect.provide(server.pipe(Effect.scoped), Database.Default);

// 4. Run the app and handle shutdown
Effect.runPromise(app).catch((error) => {
  Effect.runSync(Effect.logError("Application error: " + error));
  process.exit(1);
});

Anti-Pattern:

Letting the Node.js process exit without proper cleanup. If you run a long-running effect with Effect.runPromise or don't handle OS signals, pressing Ctrl+C will terminate the process abruptly, and none of your Effect finalizers will have a chance to run.

import { Effect } from "effect";
import { app } from "./somewhere"; // From previous example

// ❌ WRONG: This will run the server, but Ctrl+C will kill it instantly.
// The database connection finalizer will NOT be called.
Effect.runPromise(app);

Rationale:

To enable graceful shutdown for a long-running application:

  1. Define services with cleanup logic in scoped Layers using Effect.addFinalizer or Effect.acquireRelease.
  2. Launch your main application Effect using Effect.runFork to get a Fiber handle to the running process.
  3. Set up listeners for process signals like SIGINT (Ctrl+C) and SIGTERM.
  4. In the signal handler, call Fiber.interrupt on your application's fiber.

When a server process is terminated, you need to ensure that it cleans up properly. This includes closing database connections, finishing in-flight requests, and releasing file handles. Failing to do so can lead to resource leaks or data corruption.

Effect's structured concurrency makes this robust and easy. When a fiber is interrupted, Effect guarantees that it will run all finalizers registered within that fiber's scope, in the reverse order they were acquired.

By launching your app with runFork, you get a Fiber that represents the entire application. Triggering Fiber.interrupt on this top-level fiber initiates a clean, orderly shutdown sequence for all its resources.



Decouple Fibers with Queues and PubSub

Rule: Use Queue for point-to-point work distribution and PubSub for broadcast messaging between fibers.

Good Example:

A producer fiber adds jobs to a Queue, and a worker fiber takes jobs off the queue to process them.

import { Effect, Queue, Fiber } from "effect";

const program = Effect.gen(function* () {
  yield* Effect.logInfo("Starting queue demo...");

  // Create a bounded queue that can hold a maximum of 10 items.
  // This prevents memory issues by applying backpressure when the queue is full.
  // If a producer tries to add to a full queue, it will suspend until space is available.
  const queue = yield* Queue.bounded<string>(10);
  yield* Effect.logInfo("Created bounded queue");

  // Producer Fiber: Add a job to the queue every second.
  // This fiber runs independently and continuously produces work items.
  // The producer-consumer pattern decouples work generation from work processing.
  const producer = yield* Effect.gen(function* () {
    let i = 0;
    while (true) {
      const job = `job-${i++}`;
      yield* Effect.logInfo(`Producing ${job}...`);

      // Queue.offer adds an item to the queue. If the queue is full,
      // this operation will suspend the fiber until space becomes available.
      // This provides natural backpressure control.
      yield* Queue.offer(queue, job);

      // Sleep for 500ms between job creation. This controls the production rate.
      // Producer is faster than consumer (500ms vs 1000ms) to demonstrate queue buffering.
      yield* Effect.sleep("500 millis");
    }
  }).pipe(Effect.fork); // Fork creates a new fiber that runs concurrently

  yield* Effect.logInfo("Started producer fiber");

  // Worker Fiber: Take a job from the queue and process it.
  // This fiber runs independently and processes work items as they become available.
  // Multiple workers could be created to scale processing capacity.
  const worker = yield* Effect.gen(function* () {
    while (true) {
      // Queue.take removes and returns an item from the queue.
      // If the queue is empty, this operation will suspend the fiber
      // until an item becomes available. This prevents busy-waiting.
      const job = yield* Queue.take(queue);
      yield* Effect.logInfo(`Processing ${job}...`);

      // Simulate work by sleeping for 1 second.
      // This makes the worker slower than the producer, causing queue buildup.
      yield* Effect.sleep("1 second");
      yield* Effect.logInfo(`Completed ${job}`);
    }
  }).pipe(Effect.fork); // Fork creates another independent fiber

  yield* Effect.logInfo("Started worker fiber");

  // Let them run for a while...
  // The main fiber sleeps while the producer and worker fibers run concurrently.
  // During this time, you'll see the queue acting as a buffer between
  // the fast producer and slow worker.
  yield* Effect.logInfo("Running for 10 seconds...");
  yield* Effect.sleep("10 seconds");
  yield* Effect.logInfo("Done!");

  // Interrupt both fibers to clean up resources.
  // Fiber.interrupt sends an interruption signal to the fiber,
  // allowing it to perform cleanup operations before terminating.
  // This is safer than forcefully killing fibers.
  yield* Fiber.interrupt(producer);
  yield* Fiber.interrupt(worker);

  // Note: In a real application, you might want to:
  // 1. Drain the queue before interrupting workers
  // 2. Use Fiber.join to wait for graceful shutdown
  // 3. Handle interruption signals in the fiber loops
});

// Run the program
// This demonstrates the producer-consumer pattern with Effect fibers:
// - Fibers are lightweight threads that can be created in large numbers
// - Queues provide safe communication between fibers
// - Backpressure prevents resource exhaustion
// - Interruption allows for graceful shutdown
Effect.runPromise(program);

A publisher sends an event, and multiple subscribers react to it independently.

import { Effect, PubSub } from "effect";

const program = Effect.gen(function* () {
  const pubsub = yield* PubSub.bounded<string>(10);

  // Subscriber 1: The "Audit" service
  const auditSub = PubSub.subscribe(pubsub).pipe(
    Effect.flatMap((subscription) =>
      Effect.gen(function* () {
        while (true) {
          const event = yield* Queue.take(subscription);
          yield* Effect.log(`AUDIT: Received event: ${event}`);
        }
      })
    ),
    Effect.fork
  );

  // Subscriber 2: The "Notifier" service
  const notifierSub = PubSub.subscribe(pubsub).pipe(
    Effect.flatMap((subscription) =>
      Effect.gen(function* () {
        while (true) {
          const event = yield* Queue.take(subscription);
          yield* Effect.log(`NOTIFIER: Sending notification for: ${event}`);
        }
      })
    ),
    Effect.fork
  );

  // Give subscribers time to start
  yield* Effect.sleep("1 second");

  // Publisher: Publish an event that both subscribers will receive.
  yield* PubSub.publish(pubsub, "user_logged_in");
});

Anti-Pattern:

Simulating a queue with a simple Ref<A[]>. This approach is inefficient due to polling and is not safe from race conditions without manual, complex locking mechanisms. It also lacks critical features like back-pressure.

import { Effect, Ref } from "effect";

// ❌ WRONG: This is inefficient and prone to race conditions.
const program = Effect.gen(function* () {
  const queueRef = yield* Ref.make<string[]>([]);

  // Producer adds to the array
  const producer = Ref.update(queueRef, (q) => [...q, "new_item"]);

  // Consumer has to constantly poll the array to see if it's empty.
  const consumer = Ref.get(queueRef).pipe(
    Effect.flatMap(
      (q) =>
        q.length > 0
          ? Ref.set(queueRef, q.slice(1)).pipe(Effect.as(q[0]))
          : Effect.sleep("1 second").pipe(Effect.flatMap(() => consumer)) // Inefficient polling
    )
  );
});

Rationale:

To enable communication between independent, concurrent fibers, use one of Effect's specialized data structures:

  • Queue<A>: For distributing work items. Each item put on the queue is taken and processed by only one consumer.
  • PubSub<A>: For broadcasting events. Each message published is delivered to every subscriber.

Directly calling functions between different logical parts of a concurrent application creates tight coupling, making the system brittle and hard to scale. Queue and PubSub solve this by acting as asynchronous, fiber-safe message brokers.

This decouples the producer of a message from its consumer(s). The producer doesn't need to know who is listening, or how many listeners there are. This allows you to build resilient, scalable systems where you can add or remove workers/listeners without changing the producer's code.

Furthermore, bounded Queues and PubSubs provide automatic back-pressure. If consumers can't keep up, the producer will automatically pause before adding new items, preventing your system from becoming overloaded.



Poll for Status Until a Task Completes

Rule: Use Effect.race to run a repeating polling task that is automatically interrupted when a main task completes.

Good Example:

This program simulates a long-running data processing job. While it's running, a separate effect polls for its status every 2 seconds. When the main job finishes after 10 seconds, the polling automatically stops.

import { Effect, Schedule, Duration } from "effect";

// The main task that takes a long time to complete
const longRunningJob = Effect.log("Data processing complete!").pipe(
  Effect.delay(Duration.seconds(10))
);

// The polling task that checks the status
const pollStatus = Effect.log("Polling for job status: In Progress...");

// A schedule that repeats the polling task every 2 seconds, forever
const pollingSchedule = Schedule.fixed(Duration.seconds(2));

// The complete polling effect that will run indefinitely until interrupted
const repeatingPoller = pollStatus.pipe(Effect.repeat(pollingSchedule));

// Race the main job against the poller.
// The longRunningJob will win after 10 seconds, interrupting the poller.
const program = Effect.race(longRunningJob, repeatingPoller);

Effect.runPromise(program);
/*
Output:
Polling for job status: In Progress...
Polling for job status: In Progress...
Polling for job status: In Progress...
Polling for job status: In Progress...
Polling for job status: In Progress...
Data processing complete!
*/

Anti-Pattern:

Manually managing the lifecycle of the polling fiber. This is more verbose, imperative, and error-prone. You have to remember to interrupt the polling fiber in all possible exit paths (success, failure, etc.), which Effect.race does for you automatically.

import { Effect, Fiber } from "effect";
import { longRunningJob, repeatingPoller } from "./somewhere";

// ❌ WRONG: Manual fiber management is complex.
const program = Effect.gen(function* () {
  // Manually fork the poller into the background
  const pollerFiber = yield* Effect.fork(repeatingPoller);

  try {
    // Run the main job
    const result = yield* longRunningJob;
    return result;
  } finally {
    // You MUST remember to interrupt the poller when you're done.
    yield* Fiber.interrupt(pollerFiber);
  }
});

Rationale:

To run a periodic task (a "poller") that should only run for the duration of another main task, combine them using Effect.race. The main task will "win" the race upon completion, which automatically interrupts and cleans up the repeating polling effect.


This pattern elegantly solves the problem of coordinating a long-running job with a status-checking mechanism. Instead of manually managing fibers with fork and interrupt, you can declare this relationship with Effect.race.

The key is that the polling effect is set up to repeat on a schedule that runs indefinitely (or for a very long time). Because it never completes on its own, it can never "win" the race. The main task is the only one that can complete successfully. When it does, it wins the race, and Effect's structured concurrency guarantees that the losing effect (the poller) is safely interrupted.

This creates a self-contained, declarative, and leak-free unit of work.



Understand Fibers as Lightweight Threads

Rule: Understand that a Fiber is a lightweight, virtual thread managed by the Effect runtime for massive concurrency.

Good Example:

This program demonstrates the efficiency of fibers by forking 100,000 of them. Each fiber does a small amount of work (sleeping for 1 second). Trying to do this with 100,000 OS threads would instantly crash any system.

import { Effect, Fiber } from "effect";

const program = Effect.gen(function* () {
  // Demonstrate the lightweight nature of fibers by creating 100,000 of them
  // This would be impossible with OS threads due to memory and context switching overhead
  const fiberCount = 100_000;
  yield* Effect.log(`Forking ${fiberCount} fibers...`);

  // Create an array of 100,000 simple effects
  // Each effect sleeps for 1 second and then returns its index
  // This simulates lightweight concurrent tasks
  const tasks = Array.from({ length: fiberCount }, (_, i) =>
    Effect.sleep("1 second").pipe(Effect.as(i))
  );

  // Fork all of them into background fibers
  // Effect.fork creates a new fiber for each task without blocking
  // This demonstrates fiber creation scalability - 100k fibers created almost instantly
  // Each fiber is much lighter than an OS thread (typically ~1KB vs ~8MB per thread)
  const fibers = yield* Effect.forEach(tasks, Effect.fork);

  yield* Effect.log(
    "All fibers have been forked. Now waiting for them to complete..."
  );

  // Wait for all fibers to finish their work
  // Fiber.joinAll waits for all fibers to complete and collects their results
  // This demonstrates fiber coordination - managing thousands of concurrent operations
  // The runtime efficiently schedules these fibers using a work-stealing thread pool
  const results = yield* Fiber.joinAll(fibers);

  yield* Effect.log(`All ${results.length} fibers have completed.`);

  // Key insights from this example:
  // 1. Fibers are extremely lightweight - 100k fibers use minimal memory
  // 2. Fiber creation is fast - no expensive OS thread allocation
  // 3. The Effect runtime efficiently schedules fibers across available CPU cores
  // 4. Fibers can be suspended and resumed without blocking OS threads
  // 5. This enables massive concurrency for I/O-bound operations
});

// This program runs successfully, demonstrating the low overhead of fibers.
// Try running this with OS threads - you'd likely hit system limits around 1000-10000 threads
// With fibers, 100k+ concurrent operations are easily achievable
Effect.runPromise(program);

Anti-Pattern:

The anti-pattern is thinking that a Fiber is the same as an OS thread. This can lead to incorrect assumptions about performance and behavior.

  • Don't assume parallelism on CPU-bound tasks: In a standard Node.js environment, all fibers run on a single OS thread. If you run 10 CPU-intensive tasks on 10 fibers, they will not run in parallel on 10 different CPU cores. They will share time on the single main thread. Fibers provide massive concurrency for I/O-bound tasks (like network requests), not CPU-bound parallelism.
  • Don't worry about blocking: A Fiber that is "sleeping" or waiting for I/O (like Effect.sleep or a fetch request) does not block the underlying OS thread. The Effect runtime simply puts it aside and uses the thread to run other ready fibers.

Rationale:

Think of a Fiber as a "virtual thread" or a "green thread." It is the fundamental unit of concurrency in Effect. Every Effect you run is executed on a Fiber. Unlike OS threads, which are heavy and limited, you can create hundreds of thousands or even millions of fibers without issue.


In traditional multi-threaded programming, each thread is managed by the operating system, consumes significant memory (for its stack), and involves expensive context switching. This limits the number of concurrent threads you can realistically create.

Effect's Fibers are different. They are managed entirely by the Effect runtime, not the OS. They are incredibly lightweight data structures that don't have their own OS thread stack. The Effect runtime uses a cooperative scheduling mechanism to run many fibers on a small pool of OS threads (often just one in Node.js).

This model, known as M:N threading (M fibers on N OS threads), allows for a massive level of concurrency that is impossible with traditional threads. It's what makes Effect so powerful for building highly concurrent applications like servers, data pipelines, and real-time systems.

When you use operators like Effect.fork or Effect.all, you are creating new fibers.



Score

Total Score

80/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 500以上

+10
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

+5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon