
llm-streaming
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
SKILL.md
name: llm-streaming description: LLM streaming response patterns. Use when implementing real-time token streaming, Server-Sent Events for AI responses, or streaming with tool calls. tags: [llm, streaming, sse, real-time] context: fork agent: llm-integrator version: 1.0.0 author: OrchestKit user-invocable: false
LLM Streaming
Deliver LLM responses in real-time for better UX.
Basic Streaming (OpenAI)
from openai import OpenAI
client = OpenAI()
async def stream_response(prompt: str):
"""Stream tokens as they're generated."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Streaming with Async
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def async_stream(prompt: str):
"""Async streaming for better concurrency."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/chat/stream")
async def stream_chat(prompt: str):
"""Server-Sent Events endpoint for streaming."""
async def generate():
async for token in async_stream(prompt):
yield {
"event": "token",
"data": token
}
yield {"event": "done", "data": ""}
return EventSourceResponse(generate())
Frontend SSE Consumer
async function streamChat(prompt: string, onToken: (t: string) => void) {
const response = await fetch("/chat/stream?prompt=" + encodeURIComponent(prompt));
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
onToken(data);
}
}
}
}
}
// Usage
let fullResponse = '';
await streamChat('Hello', (token) => {
fullResponse += token;
setDisplayText(fullResponse); // Update UI incrementally
});
Streaming with Tool Calls
async def stream_with_tools(messages: list, tools: list):
"""Handle streaming responses that include tool calls."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
collected_content = ""
collected_tool_calls = []
async for chunk in stream:
delta = chunk.choices[0].delta
# Collect content tokens
if delta.content:
collected_content += delta.content
yield {"type": "content", "data": delta.content}
# Collect tool call chunks
if delta.tool_calls:
for tc in delta.tool_calls:
# Tool calls come in chunks, accumulate them
if tc.index >= len(collected_tool_calls):
collected_tool_calls.append({
"id": tc.id,
"function": {"name": "", "arguments": ""}
})
if tc.function.name:
collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
if tc.function.arguments:
collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# If tool calls, execute them
if collected_tool_calls:
yield {"type": "tool_calls", "data": collected_tool_calls}
Backpressure Handling
import asyncio
async def stream_with_backpressure(prompt: str, max_buffer: int = 100):
"""Handle slow consumers with backpressure."""
buffer = asyncio.Queue(maxsize=max_buffer)
async def producer():
async for token in async_stream(prompt):
await buffer.put(token) # Blocks if buffer full
await buffer.put(None) # Signal completion
async def consumer():
while True:
token = await buffer.get()
if token is None:
break
yield token
await asyncio.sleep(0) # Yield control
# Start producer in background
asyncio.create_task(producer())
# Return consumer generator
async for token in consumer():
yield token
Key Decisions
| Decision | Recommendation |
|---|---|
| Protocol | SSE for web, WebSocket for bidirectional |
| Buffer size | 50-200 tokens |
| Timeout | 30-60s for long responses |
| Retry | Reconnect on disconnect |
Common Mistakes
- No timeout (hangs on network issues)
- Missing error handling in stream
- Not closing connections properly
- Buffering entire response (defeats purpose)
Related Skills
streaming-api-patterns- SSE/WebSocket deep divefunction-calling- Tool calls in streamsreact-streaming-ui- React streaming components
Capability Details
token-streaming
Keywords: streaming, token, stream response, real-time, incremental Solves:
- Stream tokens as they're generated
- Display real-time LLM output
- Reduce time to first byte
sse-responses
Keywords: SSE, Server-Sent Events, event stream, text/event-stream Solves:
- Implement SSE for streaming
- Handle SSE reconnection
- Parse SSE event data
streaming-with-tools
Keywords: stream tools, tool streaming, function call stream Solves:
- Stream responses with tool calls
- Handle partial tool call data
- Coordinate streaming and tool execution
partial-json-parsing
Keywords: partial JSON, incremental parse, streaming JSON Solves:
- Parse JSON as it streams
- Handle incomplete JSON safely
- Display partial structured data
stream-cancellation
Keywords: cancel, abort, stop stream, AbortController Solves:
- Cancel ongoing streams
- Handle user interrupts
- Clean up stream resources
Score
Total Score
Based on repository quality metrics
SKILL.mdファイルが含まれている
ライセンスが設定されている
100文字以上の説明がある
GitHub Stars 100以上
1ヶ月以内に更新
10回以上フォークされている
オープンIssueが50未満
プログラミング言語が設定されている
1つ以上のタグが設定されている
Reviews
Reviews coming soon
