Back to list
yonatangross

grpc-python

by yonatangross

The Complete AI Development Toolkit for Claude Code — 159 skills, 34 agents, 20 commands, 144 hooks. Production-ready patterns for FastAPI, React 19, LangGraph, security, and testing.

29🍴 4📅 Jan 23, 2026

SKILL.md


name: grpc-python description: gRPC with Python using grpcio and protobuf for high-performance microservice communication. Use when implementing service-to-service APIs, streaming data, or building polyglot microservices requiring strong typing. context: fork agent: backend-system-architect version: 1.0.0 tags: [grpc, protobuf, microservices, rpc, streaming, python, 2026] author: OrchestKit user-invocable: false

gRPC Python Patterns

High-performance RPC framework for microservice communication.

Overview

  • Internal microservice communication (lower latency than REST)
  • Streaming data (real-time updates, file transfers)
  • Polyglot environments (shared proto definitions)
  • Strong typing between services (compile-time validation)
  • Bidirectional streaming (chat, gaming, real-time sync)

When NOT to Use

  • Public APIs (prefer REST/GraphQL for browser compatibility)
  • Simple CRUD with few services (REST is simpler)
  • When HTTP/2 is not available

Proto Definition

// protos/user_service.proto
syntax = "proto3";
package user.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (stream User);  // Server streaming
  rpc BulkCreateUsers(stream CreateUserRequest) returns (BulkCreateResponse);  // Client streaming
  rpc UserUpdates(stream UserUpdateRequest) returns (stream User);  // Bidirectional
}

message User {
  string id = 1;
  string email = 2;
  string name = 3;
  UserStatus status = 4;
  google.protobuf.Timestamp created_at = 5;
}

enum UserStatus {
  USER_STATUS_UNSPECIFIED = 0;
  USER_STATUS_ACTIVE = 1;
  USER_STATUS_INACTIVE = 2;
}

message GetUserRequest { string user_id = 1; }
message CreateUserRequest { string email = 1; string name = 2; string password = 3; }
message ListUsersRequest { int32 page_size = 1; string page_token = 2; }
message BulkCreateResponse { int32 created_count = 1; repeated string user_ids = 2; }

Code Generation

pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I./protos --python_out=./app/protos --pyi_out=./app/protos --grpc_python_out=./app/protos ./protos/user_service.proto

Server Implementation

import grpc
from concurrent import futures
from google.protobuf.timestamp_pb2 import Timestamp
from app.protos import user_service_pb2 as pb2
from app.protos import user_service_pb2_grpc as pb2_grpc

class UserServiceServicer(pb2_grpc.UserServiceServicer):
    def __init__(self, user_repo):
        self.user_repo = user_repo

    def GetUser(self, request, context):
        user = self.user_repo.get(request.user_id)
        if not user:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
        return self._to_proto(user)

    def CreateUser(self, request, context):
        if not request.email or "@" not in request.email:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid email")
        if self.user_repo.get_by_email(request.email):
            context.abort(grpc.StatusCode.ALREADY_EXISTS, "Email already registered")
        user = self.user_repo.create(email=request.email, name=request.name)
        return self._to_proto(user)

    def ListUsers(self, request, context):
        """Server streaming: yield users one by one."""
        for user in self.user_repo.iterate(page_size=request.page_size or 100):
            if not context.is_active():
                return
            yield self._to_proto(user)

    def BulkCreateUsers(self, request_iterator, context):
        """Client streaming: receive multiple requests."""
        created_ids = []
        for request in request_iterator:
            try:
                user = self.user_repo.create(email=request.email, name=request.name)
                created_ids.append(user.id)
            except Exception as e:
                pass  # Log error, continue
        return pb2.BulkCreateResponse(created_count=len(created_ids), user_ids=created_ids)

    def _to_proto(self, user) -> pb2.User:
        created_at = Timestamp()
        created_at.FromDatetime(user.created_at)
        return pb2.User(id=user.id, email=user.email, name=user.name, created_at=created_at)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(user_repo), server)
    from grpc_health.v1 import health, health_pb2_grpc
    health_pb2_grpc.add_HealthServicer_to_server(health.HealthServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

Async Server (grpcio >= 1.50)

import grpc.aio

class AsyncUserServiceServicer(pb2_grpc.UserServiceServicer):
    async def GetUser(self, request, context):
        user = await self.user_repo.get(request.user_id)
        if not user:
            await context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
        return self._to_proto(user)

async def serve_async():
    server = grpc.aio.server()
    pb2_grpc.add_UserServiceServicer_to_server(AsyncUserServiceServicer(), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()

Client Implementation

class UserServiceClient:
    def __init__(self, host: str = "localhost:50051"):
        self.channel = grpc.insecure_channel(host, options=[
            ("grpc.keepalive_time_ms", 30000),
            ("grpc.keepalive_timeout_ms", 10000),
        ])
        self.stub = pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: str, timeout: float = 5.0):
        try:
            return self.stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=timeout)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                raise UserNotFoundError(user_id)
            raise

    def list_users(self, page_size: int = 100):
        for user in self.stub.ListUsers(pb2.ListUsersRequest(page_size=page_size)):
            yield user

    def close(self):
        self.channel.close()

Interceptors

class LoggingInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        start = time.time()
        handler = continuation(handler_call_details)
        logger.info(f"{handler_call_details.method} in {time.time() - start:.3f}s")
        return handler

class AuthInterceptor(grpc.ServerInterceptor):
    def __init__(self, auth_service):
        self.auth_service = auth_service
        self.public_methods = {"/user.v1.UserService/CreateUser"}

    def intercept_service(self, continuation, handler_call_details):
        if handler_call_details.method not in self.public_methods:
            metadata = dict(handler_call_details.invocation_metadata)
            token = metadata.get("authorization", "").replace("Bearer ", "")
            if not token or not self.auth_service.verify(token):
                return grpc.unary_unary_rpc_method_handler(
                    lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
                )
        return continuation(handler_call_details)

class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
        self.retry_codes = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}

    def intercept_unary_unary(self, continuation, client_call_details, request):
        for attempt in range(self.max_retries):
            try:
                return continuation(client_call_details, request)
            except grpc.RpcError as e:
                if e.code() not in self.retry_codes or attempt == self.max_retries - 1:
                    raise
                time.sleep(2 ** attempt)

Key Decisions

DecisionRecommendation
Proto organizationOne service per file, shared messages in common.proto
VersioningPackage version (user.v1, user.v2), backward compatible
StreamingServer stream for large lists, bidirectional for real-time
Error codesUse standard gRPC codes, add details for validation
AuthInterceptor with metadata, JWT tokens
TimeoutsAlways set client-side deadlines
Health checksRequired for load balancers

Anti-Patterns (FORBIDDEN)

# NEVER skip deadline/timeout
stub.GetUser(request)  # Can hang forever! Use timeout=5.0

# NEVER ignore streaming cancellation
def ListUsers(self, request, context):
    for user in all_users:
        if not context.is_active():  # Check if client disconnected
            return
        yield user

# NEVER return None for message fields
# NEVER use proto2 syntax for new services
# ALWAYS close channels to prevent resource leaks
  • api-design-framework - REST/OpenAPI patterns
  • strawberry-graphql - GraphQL alternative
  • streaming-api-patterns - SSE/WebSocket patterns

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon