Back to list
jeremylongshore

langchain-rate-limits

by jeremylongshore

Hundreds of Claude Code plugins with embedded AI skills. Learn via interactive Jupyter tutorials.

1,042🍴 135📅 Jan 23, 2026

SKILL.md


name: langchain-rate-limits description: | Implement LangChain rate limiting and backoff strategies. Use when handling API quotas, implementing retry logic, or optimizing request throughput for LLM providers. Trigger with phrases like "langchain rate limit", "langchain throttling", "langchain backoff", "langchain retry", "API quota". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore jeremy@intentsolutions.io

LangChain Rate Limits

Overview

Implement robust rate limiting and retry strategies for LangChain applications to handle API quotas gracefully.

Prerequisites

  • LangChain installed with LLM provider
  • Understanding of provider rate limits
  • tenacity package for advanced retry logic

Instructions

Step 1: Understand Provider Limits

# Common rate limits by provider:
RATE_LIMITS = {
    "openai": {
        "gpt-4o": {"rpm": 10000, "tpm": 800000},
        "gpt-4o-mini": {"rpm": 10000, "tpm": 4000000},
    },
    "anthropic": {
        "claude-3-5-sonnet": {"rpm": 4000, "tpm": 400000},
    },
    "google": {
        "gemini-1.5-pro": {"rpm": 360, "tpm": 4000000},
    }
}
# rpm = requests per minute, tpm = tokens per minute

Step 2: Built-in Retry Configuration

from langchain_openai import ChatOpenAI

# LangChain has built-in retry with exponential backoff
llm = ChatOpenAI(
    model="gpt-4o-mini",
    max_retries=3,  # Number of retries
    request_timeout=30,  # Timeout per request
)

Step 3: Advanced Retry with Tenacity

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError, APIError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry=retry_if_exception_type((RateLimitError, APIError))
)
def call_with_retry(chain, input_data):
    """Call chain with exponential backoff."""
    return chain.invoke(input_data)

# Usage
result = call_with_retry(chain, {"input": "Hello"})

Step 4: Rate Limiter Wrapper

import asyncio
import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.interval = 60.0 / requests_per_minute
        self.timestamps = deque()
        self.lock = Lock()

    def acquire(self):
        """Block until request can be made."""
        with self.lock:
            now = time.time()
            # Remove timestamps older than 1 minute
            while self.timestamps and now - self.timestamps[0] > 60:
                self.timestamps.popleft()

            if len(self.timestamps) >= self.rpm:
                sleep_time = 60 - (now - self.timestamps[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)

            self.timestamps.append(time.time())

# Usage with LangChain
rate_limiter = RateLimiter(requests_per_minute=100)

def rate_limited_call(chain, input_data):
    rate_limiter.acquire()
    return chain.invoke(input_data)

Step 5: Async Rate Limiting

import asyncio
from asyncio import Semaphore

class AsyncRateLimiter:
    """Async rate limiter with semaphore."""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)

    async def call(self, chain, input_data):
        async with self.semaphore:
            return await chain.ainvoke(input_data)

# Batch processing with rate limiting
async def process_batch(chain, inputs: list, max_concurrent: int = 5):
    limiter = AsyncRateLimiter(max_concurrent)
    tasks = [limiter.call(chain, inp) for inp in inputs]
    return await asyncio.gather(*tasks, return_exceptions=True)

Output

  • Configured retry logic with exponential backoff
  • Rate limiter class for request throttling
  • Async batch processing with concurrency control
  • Graceful handling of rate limit errors

Examples

Handling Rate Limits in Production

from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig

llm = ChatOpenAI(
    model="gpt-4o-mini",
    max_retries=5,
)

# Use batch with max_concurrency
inputs = [{"input": f"Query {i}"} for i in range(100)]

results = chain.batch(
    inputs,
    config=RunnableConfig(max_concurrency=10)  # Limit concurrent calls
)

Fallback on Rate Limit

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary = ChatOpenAI(model="gpt-4o-mini", max_retries=2)
fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022")

# Automatically switch to fallback on rate limit
robust_llm = primary.with_fallbacks([fallback])

Error Handling

ErrorCauseSolution
RateLimitErrorExceeded quotaImplement backoff, reduce concurrency
TimeoutRequest too slowIncrease timeout, check network
429 Too Many RequestsAPI throttledWait and retry with backoff
Quota ExceededMonthly limit hitUpgrade plan or switch provider

Resources

Next Steps

Proceed to langchain-security-basics for security best practices.

Score

Total Score

85/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

0/10
人気

GitHub Stars 1000以上

+15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

+5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon