← Back to list

grpc-python
by yonatangross
The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.
⭐ 29🍴 4📅 Jan 23, 2026
SKILL.md
name: grpc-python description: gRPC with Python using grpcio and protobuf for high-performance microservice communication. Use when implementing service-to-service APIs, streaming data, or building polyglot microservices requiring strong typing. context: fork agent: backend-system-architect version: 1.0.0 tags: [grpc, protobuf, microservices, rpc, streaming, python, 2026] author: OrchestKit user-invocable: false
gRPC Python Patterns
High-performance RPC framework for microservice communication.
Overview
- Internal microservice communication (lower latency than REST)
- Streaming data (real-time updates, file transfers)
- Polyglot environments (shared proto definitions)
- Strong typing between services (compile-time validation)
- Bidirectional streaming (chat, gaming, real-time sync)
When NOT to Use
- Public APIs (prefer REST/GraphQL for browser compatibility)
- Simple CRUD with few services (REST is simpler)
- When HTTP/2 is not available
Proto Definition
// protos/user_service.proto
syntax = "proto3";
package user.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (stream User); // Server streaming
rpc BulkCreateUsers(stream CreateUserRequest) returns (BulkCreateResponse); // Client streaming
rpc UserUpdates(stream UserUpdateRequest) returns (stream User); // Bidirectional
}
message User {
string id = 1;
string email = 2;
string name = 3;
UserStatus status = 4;
google.protobuf.Timestamp created_at = 5;
}
enum UserStatus {
USER_STATUS_UNSPECIFIED = 0;
USER_STATUS_ACTIVE = 1;
USER_STATUS_INACTIVE = 2;
}
message GetUserRequest { string user_id = 1; }
message CreateUserRequest { string email = 1; string name = 2; string password = 3; }
message ListUsersRequest { int32 page_size = 1; string page_token = 2; }
message BulkCreateResponse { int32 created_count = 1; repeated string user_ids = 2; }
Code Generation
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I./protos --python_out=./app/protos --pyi_out=./app/protos --grpc_python_out=./app/protos ./protos/user_service.proto
Server Implementation
import grpc
from concurrent import futures
from google.protobuf.timestamp_pb2 import Timestamp
from app.protos import user_service_pb2 as pb2
from app.protos import user_service_pb2_grpc as pb2_grpc
class UserServiceServicer(pb2_grpc.UserServiceServicer):
def __init__(self, user_repo):
self.user_repo = user_repo
def GetUser(self, request, context):
user = self.user_repo.get(request.user_id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
return self._to_proto(user)
def CreateUser(self, request, context):
if not request.email or "@" not in request.email:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid email")
if self.user_repo.get_by_email(request.email):
context.abort(grpc.StatusCode.ALREADY_EXISTS, "Email already registered")
user = self.user_repo.create(email=request.email, name=request.name)
return self._to_proto(user)
def ListUsers(self, request, context):
"""Server streaming: yield users one by one."""
for user in self.user_repo.iterate(page_size=request.page_size or 100):
if not context.is_active():
return
yield self._to_proto(user)
def BulkCreateUsers(self, request_iterator, context):
"""Client streaming: receive multiple requests."""
created_ids = []
for request in request_iterator:
try:
user = self.user_repo.create(email=request.email, name=request.name)
created_ids.append(user.id)
except Exception as e:
pass # Log error, continue
return pb2.BulkCreateResponse(created_count=len(created_ids), user_ids=created_ids)
def _to_proto(self, user) -> pb2.User:
created_at = Timestamp()
created_at.FromDatetime(user.created_at)
return pb2.User(id=user.id, email=user.email, name=user.name, created_at=created_at)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(user_repo), server)
from grpc_health.v1 import health, health_pb2_grpc
health_pb2_grpc.add_HealthServicer_to_server(health.HealthServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Async Server (grpcio >= 1.50)
import grpc.aio
class AsyncUserServiceServicer(pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await self.user_repo.get(request.user_id)
if not user:
await context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
return self._to_proto(user)
async def serve_async():
server = grpc.aio.server()
pb2_grpc.add_UserServiceServicer_to_server(AsyncUserServiceServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
Client Implementation
class UserServiceClient:
def __init__(self, host: str = "localhost:50051"):
self.channel = grpc.insecure_channel(host, options=[
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 10000),
])
self.stub = pb2_grpc.UserServiceStub(self.channel)
def get_user(self, user_id: str, timeout: float = 5.0):
try:
return self.stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=timeout)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
raise UserNotFoundError(user_id)
raise
def list_users(self, page_size: int = 100):
for user in self.stub.ListUsers(pb2.ListUsersRequest(page_size=page_size)):
yield user
def close(self):
self.channel.close()
Interceptors
class LoggingInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
start = time.time()
handler = continuation(handler_call_details)
logger.info(f"{handler_call_details.method} in {time.time() - start:.3f}s")
return handler
class AuthInterceptor(grpc.ServerInterceptor):
def __init__(self, auth_service):
self.auth_service = auth_service
self.public_methods = {"/user.v1.UserService/CreateUser"}
def intercept_service(self, continuation, handler_call_details):
if handler_call_details.method not in self.public_methods:
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "").replace("Bearer ", "")
if not token or not self.auth_service.verify(token):
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
)
return continuation(handler_call_details)
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_codes = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}
def intercept_unary_unary(self, continuation, client_call_details, request):
for attempt in range(self.max_retries):
try:
return continuation(client_call_details, request)
except grpc.RpcError as e:
if e.code() not in self.retry_codes or attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
Key Decisions
| Decision | Recommendation |
|---|---|
| Proto organization | One service per file, shared messages in common.proto |
| Versioning | Package version (user.v1, user.v2), backward compatible |
| Streaming | Server stream for large lists, bidirectional for real-time |
| Error codes | Use standard gRPC codes, add details for validation |
| Auth | Interceptor with metadata, JWT tokens |
| Timeouts | Always set client-side deadlines |
| Health checks | Required for load balancers |
Anti-Patterns (FORBIDDEN)
# NEVER skip deadline/timeout
stub.GetUser(request) # Can hang forever! Use timeout=5.0
# NEVER ignore streaming cancellation
def ListUsers(self, request, context):
for user in all_users:
if not context.is_active(): # Check if client disconnected
return
yield user
# NEVER return None for message fields
# NEVER use proto2 syntax for new services
# ALWAYS close channels to prevent resource leaks
Related Skills
api-design-framework- REST/OpenAPI patternsstrawberry-graphql- GraphQL alternativestreaming-api-patterns- SSE/WebSocket patterns
Score
Total Score
75/100
Based on repository quality metrics
✓SKILL.md
SKILL.mdファイルが含まれている
+20
✓LICENSE
ライセンスが設定されている
+10
✓説明文
100文字以上の説明がある
+10
○人気
GitHub Stars 100以上
0/15
✓最近の活動
1ヶ月以内に更新
+10
○フォーク
10回以上フォークされている
0/5
✓Issue管理
オープンIssueが50未満
+5
✓言語
プログラミング言語が設定されている
+5
✓タグ
1つ以上のタグが設定されている
+5
Reviews
💬
Reviews coming soon
