gRPCサービス - 解説
gRPCの内部アーキテクチャ
HTTP/2の基礎
gRPCは、HTTP/2プロトコルの上に構築されています。HTTP/2の理解は、gRPCのパフォーマンス特性を理解する上で不可欠です。
HTTP/1.1とHTTP/2の違い
HTTP/1.1の制約:
クライアント サーバー
| |
|--- リクエスト1 -------------->|
|<-- レスポンス1 ---------------|
| |
|--- リクエスト2 -------------->|
|<-- レスポンス2 ---------------|
| |
順次処理(Head-of-Line Blocking)
HTTP/2の多重化:
クライアント サーバー
| |
|====== 単一TCP接続 ===========|
| |
|-- Stream 1 (req) ----------->|
|-- Stream 2 (req) ----------->|
|<- Stream 1 (res) ------------|
|-- Stream 3 (req) ----------->|
|<- Stream 2 (res) ------------|
|<- Stream 3 (res) ------------|
| |
並行処理(Multiplexing)
HTTP/2のフレーム構造
HTTP/2は、データをフレーム単位で送信します。gRPCは、以下のフレームタイプを使用します:
+-----------------------------------------------+
| Frame Header (9 bytes) |
+---------------+---------------+---------------+
| Length (24) | Type (8) | Flags (8) |
+---------------+-------------------------------+
| Stream ID (32) |
+-----------------------------------------------+
| Frame Payload |
+-----------------------------------------------+
主要なフレームタイプ:
- HEADERS: HTTPヘッダー(gRPCメタデータ)
- DATA: メッセージペイロード(Protocol Buffers)
- RST_STREAM: ストリームのリセット(エラー時)
- SETTINGS: 接続設定(ウィンドウサイズなど)
- WINDOW_UPDATE: フロー制御
- PING: 接続確認
- GOAWAY: 接続終了
gRPCメッセージのエンコーディング
gRPCは、HTTP/2のDATAフレームに以下の形式でメッセージを格納します:
+-------------------+
| Compressed Flag | 1 byte (0: 非圧縮, 1: 圧縮)
+-------------------+
| Message Length | 4 bytes (ビッグエンディアン)
+-------------------+
| Message Data | Length bytes (Protocol Buffers)
+-------------------+
実装例:
// gRPCメッセージのエンコーディング(簡略版)
func encodeMessage(msg proto.Message) ([]byte, error) {
// Protocol Buffersのシリアライズ
data, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
// gRPCフレームの構築
frame := make([]byte, 5+len(data))
frame[0] = 0 // 非圧縮フラグ
binary.BigEndian.PutUint32(frame[1:5], uint32(len(data)))
copy(frame[5:], data)
return frame, nil
}
gRPCのレイヤーアーキテクチャ
gRPCは、以下の階層で構成されています:
┌─────────────────────────────────────────┐
│ Application Layer │
│ (ビジネスロジック) │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ Stub Layer │
│ (自動生成されたクライアント/サーバーコード) │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ gRPC Core │
│ (Interceptor、エラーハンドリング) │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ Transport Layer │
│ (HTTP/2実装、フロー制御) │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ Network Layer │
│ (TCP/IP、TLS) │
└─────────────────────────────────────────┘
チャネル(Channel)の構造
gRPCのチャネルは、サーバーへの接続を管理します:
// チャネルの内部構造(概念図)
type Channel struct {
target string // 接続先アドレス
balancer Balancer // 負荷分散器
subConns []SubConnection // サブコネクション
resolver Resolver // サービスディスカバリ
interceptors []Interceptor // インターセプター
callOptions []CallOption // デフォルトオプション
}
// サブコネクションの管理
type SubConnection struct {
addr string // 実際の接続先
httpClient *http2.Client // HTTP/2クライアント
state connectivity.State // 接続状態
streams map[int]*Stream // アクティブなストリーム
}
---
4つの通信パターンの使い分け
1. Unary RPC(単項RPC)
特徴:
- 1つのリクエスト → 1つのレスポンス
- RESTful APIに最も近い
- 最もシンプルで理解しやすい
適用シナリオ:
- CRUD操作(作成、読み取り、更新、削除)
- データ検証
- 認証・認可
実装パターン:
// シンプルなUnary RPC
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 1. バリデーション
if req.UserId == "" {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
// 2. ビジネスロジック
user, err := s.db.FindUser(req.UserId)
if err != nil {
return nil, status.Error(codes.NotFound, "user not found")
}
// 3. レスポンス返却
return &pb.User{
Id: user.ID,
Username: user.Username,
Email: user.Email,
}, nil
}
パフォーマンス考慮点:
- コネクションプーリング:複数のリクエストで接続を再利用
- タイムアウト設定:長時間処理の防止
- キャッシング:頻繁に読まれるデータのキャッシュ
2. Server Streaming RPC(サーバーストリーミング)
特徴:
- 1つのリクエスト → 複数のレスポンス(ストリーム)
- サーバーが順次データを送信
- クライアントは受信完了まで待機
適用シナリオ:
- 大量データの取得(ページネーション)
- ログの配信
- リアルタイム更新の通知
実装パターン:
// ページネーション付きリスト取得
func (s *server) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
cursor := req.PageToken
pageSize := req.PageSize
if pageSize == 0 {
pageSize = 100
}
for {
// データベースからページを取得
users, nextCursor, err := s.db.GetUsersPage(cursor, pageSize)
if err != nil {
return status.Errorf(codes.Internal, "database error: %v", err)
}
// 各ユーザーを送信
for _, user := range users {
if err := stream.Send(&pb.User{
Id: user.ID,
Username: user.Username,
Email: user.Email,
}); err != nil {
return status.Errorf(codes.Internal, "send error: %v", err)
}
}
// 最終ページに到達
if nextCursor == "" {
break
}
cursor = nextCursor
}
return nil
}
バックプレッシャー(Back Pressure):
サーバーがクライアントより速くデータを送信すると、バッファが溢れる可能性があります。HTTP/2のフロー制御が自動的に処理します:
サーバー クライアント
| |
|-- DATA (64KB) ---------------->|
|-- DATA (64KB) ---------------->|
| | バッファが満杯に近づく
|<-- WINDOW_UPDATE -------------|
| | 処理完了、ウィンドウ更新
|-- DATA (64KB) ---------------->|
| |
3. Client Streaming RPC(クライアントストリーミング)
特徴:
- 複数のリクエスト(ストリーム) → 1つのレスポンス
- クライアントが順次データを送信
- サーバーはすべて受信後にレスポンス
適用シナリオ:
- ファイルアップロード
- バッチデータ送信
- ログ収集
実装パターン:
// ファイルアップロードの実装
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var file *os.File
var totalSize int64
for {
chunk, err := stream.Recv()
if err == io.EOF {
// すべてのチャンクを受信完了
file.Close()
return stream.SendAndClose(&pb.UploadResponse{
FileId: generateFileID(),
Size: totalSize,
Checksum: calculateChecksum(file),
})
}
if err != nil {
if file != nil {
file.Close()
os.Remove(file.Name())
}
return status.Errorf(codes.Internal, "receive error: %v", err)
}
// 初回チャンクでファイル作成
if file == nil {
file, err = os.Create(fmt.Sprintf("/tmp/%s", chunk.Filename))
if err != nil {
return status.Errorf(codes.Internal, "file create error: %v", err)
}
}
// データを書き込み
n, err := file.Write(chunk.Data)
if err != nil {
file.Close()
os.Remove(file.Name())
return status.Errorf(codes.Internal, "write error: %v", err)
}
totalSize += int64(n)
}
}
クライアント側の実装:
func uploadFile(client pb.FileServiceClient, filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
stream, err := client.UploadFile(context.Background())
if err != nil {
return err
}
buffer := make([]byte, 1024*1024) // 1MBチャンク
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return err
}
chunk := &pb.FileChunk{
Filename: filepath.Base(filePath),
Data: buffer[:n],
}
if err := stream.Send(chunk); err != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
fmt.Printf("Upload complete: %s (%d bytes)\n", resp.FileId, resp.Size)
return nil
}
4. Bidirectional Streaming RPC(双方向ストリーミング)
特徴:
- 複数のリクエスト(ストリーム) ⇄ 複数のレスポンス(ストリーム)
- クライアントとサーバーが独立して送受信
- 最も柔軟だが、最も複雑
適用シナリオ:
- リアルタイムチャット
- ゲームサーバー
- IoTデータストリーム
- コラボレーションツール
実装パターン:
// リアルタイムチャットの実装
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// クライアント情報の登録
clientID := generateClientID()
s.registerClient(clientID, stream)
defer s.unregisterClient(clientID)
// 受信専用ゴルーチン
errChan := make(chan error, 1)
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
errChan <- nil
return
}
if err != nil {
errChan <- err
return
}
// メッセージをブロードキャスト
s.broadcastMessage(msg)
}
}()
// 送信専用ゴルーチン(サーバー起動時のメッセージなど)
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
// 定期的にpingメッセージを送信(接続維持)
if err := stream.Send(&pb.ChatMessage{
Type: pb.MessageType_MESSAGE_TYPE_SYSTEM,
Content: "ping",
}); err != nil {
return
}
}
}()
// エラーまたは正常終了を待機
return <-errChan
}
同期パターン vs 非同期パターン:
// 同期パターン(順次処理)
func (s *server) ChatSync(stream pb.ChatService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 受信したメッセージに対してすぐにレスポンス
response := processMessage(msg)
if err := stream.Send(response); err != nil {
return err
}
}
}
// 非同期パターン(独立した送受信)
func (s *server) ChatAsync(stream pb.ChatService_ChatServer) error {
// 受信ゴルーチン
go func() {
for {
msg, err := stream.Recv()
if err != nil {
return
}
s.messageQueue <- msg // キューに追加
}
}()
// 送信ゴルーチン
for msg := range s.messageQueue {
response := processMessage(msg)
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
---
パフォーマンス最適化
接続プーリング
gRPCは、デフォルトで接続プーリングを実装していますが、明示的に制御することも可能です。
// 接続プールの設定
const (
maxIdleConns = 100
maxConnsPerHost = 10
idleConnTimeout = 90 * time.Second
tlsHandshakeTimeout = 10 * time.Second
)
// カスタムTransport設定
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024), // 10MB
grpc.MaxCallSendMsgSize(10*1024*1024),
),
}
conn, err := grpc.Dial(target, opts...)
接続の再利用:
// ✅ 良い例:接続を再利用
var globalConn *grpc.ClientConn
func init() {
conn, err := grpc.Dial(target, opts...)
if err != nil {
panic(err)
}
globalConn = conn
}
func callService() {
client := pb.NewUserServiceClient(globalConn)
// 複数回呼び出しても同じ接続を使用
client.GetUser(ctx, req)
}
// ❌ 悪い例:毎回新しい接続を作成
func callServiceBad() {
conn, _ := grpc.Dial(target, opts...) // 毎回接続
defer conn.Close()
client := pb.NewUserServiceClient(conn)
client.GetUser(ctx, req)
}
Keep-Alive設定
長時間接続を維持するための設定:
// サーバー側のKeep-Alive設定
s := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Second, // Pingを送信する間隔
Timeout: 10 * time.Second, // Ping応答のタイムアウト
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // クライアントPingの最小間隔
PermitWithoutStream: true, // ストリームなしでもPing許可
}),
)
// クライアント側のKeep-Alive設定
conn, err := grpc.Dial(
target,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}),
)
Keep-Aliveの動作フロー:
クライアント サーバー
| |
|====== アイドル状態 ===========|
| |
[30秒経過] |
| |
|-- PING --------------------- >|
| |
|<- PONG --------------------- |
| |
[接続維持] |
| |
メッセージ圧縮
大きなメッセージを送信する場合、圧縮が有効です:
import "google.golang.org/grpc/encoding/gzip"
// サーバー側で圧縮を有効化
s := grpc.NewServer(
grpc.RPCCompressor(grpc.NewGZIPCompressor()),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
)
// クライアント側で圧縮を要求
response, err := client.GetUser(
ctx,
request,
grpc.UseCompressor(gzip.Name),
)
圧縮の効果:
メッセージサイズ: 1MB(JSON形式のユーザーデータ)
圧縮なし:
- ネットワーク転送: 1,000KB
- 転送時間: 100ms(10Mbps回線)
- CPU使用率: 低
gzip圧縮:
- ネットワーク転送: 200KB(圧縮率80%)
- 転送時間: 20ms
- CPU使用率: 中(圧縮・展開のオーバーヘッド)
トレードオフ:
- ネットワーク帯域が制約 → 圧縮推奨
- CPUリソースが制約 → 圧縮非推奨
バッチ処理
複数の小さなリクエストを1つにまとめることで、オーバーヘッドを削減:
// ❌ 非効率:個別リクエスト
for _, userID := range userIDs {
user, _ := client.GetUser(ctx, &pb.GetUserRequest{UserId: userID})
users = append(users, user)
}
// 100ユーザー → 100回のRPC(100ms × 100 = 10秒)
// ✅ 効率的:バッチリクエスト
response, _ := client.BatchGetUsers(ctx, &pb.BatchGetUsersRequest{
UserIds: userIDs,
})
users := response.Users
// 100ユーザー → 1回のRPC(100ms)
バッチRPCの実装:
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc BatchGetUsers(BatchGetUsersRequest) returns (BatchGetUsersResponse);
}
message BatchGetUsersRequest {
repeated string user_ids = 1;
}
message BatchGetUsersResponse {
repeated User users = 1;
}
---
Interceptorチェーンの設計
Interceptorは、gRPCのミドルウェアであり、横断的関心事(認証、ログ、メトリクスなど)を実装します。
Interceptorの実行順序
// Interceptorチェーンの登録
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
loggingInterceptor, // 1番目
authInterceptor, // 2番目
metricsInterceptor, // 3番目
),
)
// 実行順序:
// リクエスト: logging → auth → metrics → handler
// レスポンス: handler → metrics → auth → logging
実行フロー:
リクエスト →
loggingInterceptor (開始ログ)
authInterceptor (認証チェック)
metricsInterceptor (開始時刻記録)
handler (ビジネスロジック)
metricsInterceptor (レイテンシ計算)
authInterceptor (認可確認)
loggingInterceptor (終了ログ)
→ レスポンス
実用的なInterceptorパターン
1. 認証Interceptor
func authInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// メタデータからトークンを取得
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
// JWT検証
claims, err := verifyJWT(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// コンテキストにユーザー情報を追加
ctx = context.WithValue(ctx, "user", claims)
// 次のInterceptorまたはHandlerを呼び出し
return handler(ctx, req)
}
// JWT検証の実装
func verifyJWT(token string) (*Claims, error) {
parsed, err := jwt.Parse(token, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method")
}
return []byte(jwtSecret), nil
})
if err != nil {
return nil, err
}
if claims, ok := parsed.Claims.(*Claims); ok && parsed.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid claims")
}
2. レート制限Interceptor
import "golang.org/x/time/rate"
// ユーザーごとのレート制限
type RateLimiter struct {
limiters sync.Map // map[string]*rate.Limiter
}
func newRateLimiter() *RateLimiter {
return &RateLimiter{}
}
func (r *RateLimiter) getLimiter(userID string) *rate.Limiter {
limiter, exists := r.limiters.Load(userID)
if !exists {
// 毎秒10リクエスト、バースト20まで許可
limiter = rate.NewLimiter(10, 20)
r.limiters.Store(userID, limiter)
}
return limiter.(*rate.Limiter)
}
var rateLimiter = newRateLimiter()
func rateLimitInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// コンテキストからユーザーIDを取得
user := ctx.Value("user").(*Claims)
limiter := rateLimiter.getLimiter(user.UserID)
if !limiter.Allow() {
return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
}
return handler(ctx, req)
}
3. 分散トレーシングInterceptor
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func tracingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
tracer := otel.Tracer("grpc-server")
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// リクエスト情報をスパンに記録
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", extractService(info.FullMethod)),
)
// ハンドラー実行
resp, err := handler(ctx, req)
// エラーをスパンに記録
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
}
return resp, err
}
4. リカバリーInterceptor
func recoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
// panicをキャッチ
defer func() {
if r := recover(); r != nil {
// スタックトレースをログに記録
stack := debug.Stack()
log.Printf("Panic recovered: %v\n%s", r, stack)
// gRPCエラーとして返す
err = status.Errorf(codes.Internal, "internal server error: %v", r)
}
}()
return handler(ctx, req)
}
---
発展的学習への道筋
gRPC-Web:ブラウザからのgRPC接続
gRPC-Webは、ブラウザからgRPCサーバーに接続するためのプロトコルです。
アーキテクチャ:
ブラウザ Envoy Proxy gRPCサーバー
| | |
|-- gRPC-Web ------------>| |
| (HTTP/1.1 or 2) | |
| |-- gRPC (HTTP/2) ----->|
| | |
|<-- gRPC-Web Response ---|<-- gRPC Response -----|
クライアントコード(TypeScript):
import { UserServiceClient } from './generated/user_grpc_web_pb';
import { GetUserRequest } from './generated/user_pb';
const client = new UserServiceClient('http://localhost:8080');
const request = new GetUserRequest();
request.setUserId('123');
client.getUser(request, {}, (err, response) => {
if (err) {
console.error(err);
} else {
console.log('User:', response.getUsername());
}
});
Connect:gRPCの次世代プロトコル
Connectは、Bufが開発した、gRPC、gRPC-Web、RESTful JSONをサポートする統合プロトコルです。
特徴:
- ブラウザから直接接続(プロキシ不要)
- cURLでのテストが可能
- HTTP/1.1とHTTP/2の両対応
- 既存のgRPCサーバーと互換性
サーバー実装(Connect for Go):
import (
"connectrpc.com/connect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// Connectハンドラーの作成
func (s *server) GetUser(
ctx context.Context,
req *connect.Request[pb.GetUserRequest],
) (*connect.Response[pb.User], error) {
user, err := s.db.FindUser(req.Msg.UserId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return connect.NewResponse(&pb.User{
Id: user.ID,
Username: user.Username,
}), nil
}
// HTTPサーバーの起動
mux := http.NewServeMux()
path, handler := pbconnect.NewUserServiceHandler(&server{})
mux.Handle(path, handler)
http.ListenAndServe(":8080", h2c.NewHandler(mux, &http2.Server{}))
クライアントからのcURL呼び出し:
# Connect protocol (JSON)
curl -X POST http://localhost:8080/user.v1.UserService/GetUser \
-H "Content-Type: application/json" \
-d '{"userId": "123"}'
# gRPC protocol
grpcurl -d '{"userId": "123"}' localhost:8080 user.v1.UserService/GetUser
サービスメッシュ統合:Istio/Linkerd
サービスメッシュは、gRPCサービスの運用を大幅に簡素化します。
Istio統合の例:
# DestinationRule: トラフィック管理
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 2
loadBalancer:
simple: LEAST_REQUEST
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
# VirtualService: リトライとタイムアウト
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
hosts:
- user-service
http:
- route:
- destination:
host: user-service
retries:
attempts: 3
perTryTimeout: 2s
retryOn: 5xx,reset,connect-failure,refused-stream
timeout: 10s
利点:
- アプリケーションコードを変更せずにトラフィック管理
- 自動的なmTLS暗号化
- 分散トレーシングの自動計装
- カナリアリリースやA/Bテストの簡単な実装
---
まとめ
gRPCの学習パス
レベル1:基礎
├─ Protocol Buffersの理解
├─ Unary RPCの実装
└─ エラーハンドリング
レベル2:中級
├─ 4つの通信パターンの使い分け
├─ Interceptorの実装
├─ テスト戦略
└─ パフォーマンス最適化
レベル3:上級
├─ 負荷分散とサービスディスカバリ
├─ サービスメッシュ統合
├─ 分散トレーシング
└─ セキュリティ(mTLS、認証)
レベル4:エキスパート
├─ gRPC-Webとブラウザ統合
├─ カスタムトランスポート実装
├─ プロトコル拡張
└─ 大規模システム設計
実務で重要なポイント
- パフォーマンス
- 可観測性
- セキュリティ
- 運用
gRPCは、現代のマイクロサービスアーキテクチャにおいて不可欠な技術です。この解説を基に、実際のプロジェクトで経験を積み、エキスパートレベルのスキルを習得してください。