gRPCサービス - 解答例

完全なProtocol Buffers定義

chat.proto

syntax = "proto3";

package chat;

option go_package = "chat/pb";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// チャットサービスの定義
service ChatService {
  // Unary RPC: ユーザー登録
  rpc RegisterUser(RegisterUserRequest) returns (UserResponse);

  // Unary RPC: ユーザー情報取得
  rpc GetUser(GetUserRequest) returns (UserResponse);

  // Server Streaming RPC: メッセージ履歴を取得
  rpc GetMessageHistory(GetMessageHistoryRequest) returns (stream ChatMessage);

  // Client Streaming RPC: バッチメッセージ送信
  rpc SendBatchMessages(stream ChatMessage) returns (BatchSendResponse);

  // Bidirectional Streaming RPC: リアルタイムチャット
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);

  // Server Streaming RPC: ユーザーリストをストリーミング
  rpc ListActiveUsers(google.protobuf.Empty) returns (stream UserResponse);
}

// リクエスト・レスポンスメッセージ

message RegisterUserRequest {
  string username = 1;
  string email = 2;
}

message GetUserRequest {
  string user_id = 1;
}

message UserResponse {
  string user_id = 1;
  string username = 2;
  string email = 3;
  google.protobuf.Timestamp created_at = 4;
  UserStatus status = 5;
}

enum UserStatus {
  USER_STATUS_UNSPECIFIED = 0;
  USER_STATUS_ONLINE = 1;
  USER_STATUS_OFFLINE = 2;
  USER_STATUS_AWAY = 3;
}

message ChatMessage {
  string message_id = 1;
  string user_id = 2;
  string username = 3;
  string room_id = 4;
  string content = 5;
  google.protobuf.Timestamp timestamp = 6;
  MessageType type = 7;
}

enum MessageType {
  MESSAGE_TYPE_UNSPECIFIED = 0;
  MESSAGE_TYPE_TEXT = 1;
  MESSAGE_TYPE_IMAGE = 2;
  MESSAGE_TYPE_FILE = 3;
  MESSAGE_TYPE_SYSTEM = 4;
}

message GetMessageHistoryRequest {
  string room_id = 1;
  int32 limit = 2;
  google.protobuf.Timestamp before = 3;
}

message BatchSendResponse {
  int32 sent_count = 1;
  repeated string failed_message_ids = 2;
}

---

サーバー実装

server/main.go

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"sync"
	"time"

	pb "chat/pb"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

// User represents a registered user
type User struct {
	ID        string
	Username  string
	Email     string
	Status    pb.UserStatus
	CreatedAt time.Time
}

// Message represents a chat message
type Message struct {
	ID        string
	UserID    string
	Username  string
	RoomID    string
	Content   string
	Type      pb.MessageType
	Timestamp time.Time
}

// chatServer implements the ChatService
type chatServer struct {
	pb.UnimplementedChatServiceServer

	// User management
	mu      sync.RWMutex
	users   map[string]*User
	clients map[pb.ChatService_ChatServer]*clientInfo

	// Message storage
	messagesMu sync.RWMutex
	messages   map[string][]*Message // room_id -> messages
}

type clientInfo struct {
	userID   string
	username string
	roomID   string
}

func newChatServer() *chatServer {
	return &chatServer{
		users:    make(map[string]*User),
		clients:  make(map[pb.ChatService_ChatServer]*clientInfo),
		messages: make(map[string][]*Message),
	}
}

// Unary RPC: RegisterUser
func (s *chatServer) RegisterUser(ctx context.Context, req *pb.RegisterUserRequest) (*pb.UserResponse, error) {
	// 入力バリデーション
	if req.Username == "" {
		return nil, status.Error(codes.InvalidArgument, "username is required")
	}
	if req.Email == "" {
		return nil, status.Error(codes.InvalidArgument, "email is required")
	}

	// ユーザーIDの生成
	userID := uuid.New().String()

	user := &User{
		ID:        userID,
		Username:  req.Username,
		Email:     req.Email,
		Status:    pb.UserStatus_USER_STATUS_ONLINE,
		CreatedAt: time.Now(),
	}

	s.mu.Lock()
	s.users[userID] = user
	s.mu.Unlock()

	log.Printf("User registered: %s (%s)", user.Username, user.ID)

	return &pb.UserResponse{
		UserId:    user.ID,
		Username:  user.Username,
		Email:     user.Email,
		Status:    user.Status,
		CreatedAt: timestamppb.New(user.CreatedAt),
	}, nil
}

// Unary RPC: GetUser
func (s *chatServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.UserResponse, error) {
	if req.UserId == "" {
		return nil, status.Error(codes.InvalidArgument, "user_id is required")
	}

	s.mu.RLock()
	user, exists := s.users[req.UserId]
	s.mu.RUnlock()

	if !exists {
		return nil, status.Error(codes.NotFound, "user not found")
	}

	return &pb.UserResponse{
		UserId:    user.ID,
		Username:  user.Username,
		Email:     user.Email,
		Status:    user.Status,
		CreatedAt: timestamppb.New(user.CreatedAt),
	}, nil
}

// Server Streaming RPC: GetMessageHistory
func (s *chatServer) GetMessageHistory(req *pb.GetMessageHistoryRequest, stream pb.ChatService_GetMessageHistoryServer) error {
	if req.RoomId == "" {
		return status.Error(codes.InvalidArgument, "room_id is required")
	}

	s.messagesMu.RLock()
	messages, exists := s.messages[req.RoomId]
	s.messagesMu.RUnlock()

	if !exists {
		return nil // 空の履歴を返す
	}

	// リミットが指定されていない場合は100件
	limit := req.Limit
	if limit == 0 {
		limit = 100
	}

	// メッセージをストリーミング送信
	count := 0
	for i := len(messages) - 1; i >= 0 && count < int(limit); i-- {
		msg := messages[i]

		// beforeフィルタリング
		if req.Before != nil && msg.Timestamp.After(req.Before.AsTime()) {
			continue
		}

		pbMsg := &pb.ChatMessage{
			MessageId: msg.ID,
			UserId:    msg.UserID,
			Username:  msg.Username,
			RoomId:    msg.RoomID,
			Content:   msg.Content,
			Type:      msg.Type,
			Timestamp: timestamppb.New(msg.Timestamp),
		}

		if err := stream.Send(pbMsg); err != nil {
			return status.Errorf(codes.Internal, "failed to send message: %v", err)
		}

		count++
	}

	log.Printf("Sent %d messages from room %s", count, req.RoomId)
	return nil
}

// Client Streaming RPC: SendBatchMessages
func (s *chatServer) SendBatchMessages(stream pb.ChatService_SendBatchMessagesServer) error {
	var sentCount int32
	var failedIDs []string

	for {
		msg, err := stream.Recv()
		if err == io.EOF {
			// すべてのメッセージを受信完了
			return stream.SendAndClose(&pb.BatchSendResponse{
				SentCount:        sentCount,
				FailedMessageIds: failedIDs,
			})
		}
		if err != nil {
			return status.Errorf(codes.Internal, "failed to receive message: %v", err)
		}

		// メッセージの保存
		if err := s.storeMessage(msg); err != nil {
			failedIDs = append(failedIDs, msg.MessageId)
			log.Printf("Failed to store message %s: %v", msg.MessageId, err)
			continue
		}

		sentCount++
	}
}

// Bidirectional Streaming RPC: Chat
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
	// クライアント情報の登録
	info := &clientInfo{}

	s.mu.Lock()
	s.clients[stream] = info
	s.mu.Unlock()

	defer func() {
		s.mu.Lock()
		delete(s.clients, stream)
		s.mu.Unlock()
		log.Printf("Client disconnected: %s", info.username)
	}()

	// メッセージ受信のゴルーチン
	errChan := make(chan error, 1)
	go func() {
		for {
			msg, err := stream.Recv()
			if err == io.EOF {
				errChan <- nil
				return
			}
			if err != nil {
				errChan <- err
				return
			}

			// 初回メッセージでクライアント情報を設定
			if info.userID == "" {
				info.userID = msg.UserId
				info.username = msg.Username
				info.roomID = msg.RoomId
				log.Printf("Client connected: %s (room: %s)", msg.Username, msg.RoomId)
			}

			// メッセージIDとタイムスタンプの設定
			if msg.MessageId == "" {
				msg.MessageId = uuid.New().String()
			}
			msg.Timestamp = timestamppb.Now()

			// メッセージの保存
			if err := s.storeMessage(msg); err != nil {
				log.Printf("Failed to store message: %v", err)
			}

			// ブロードキャスト
			s.broadcast(msg)
		}
	}()

	// エラーまたは正常終了を待機
	err := <-errChan
	if err != nil {
		return status.Errorf(codes.Internal, "chat error: %v", err)
	}

	return nil
}

// Server Streaming RPC: ListActiveUsers
func (s *chatServer) ListActiveUsers(_ *emptypb.Empty, stream pb.ChatService_ListActiveUsersServer) error {
	s.mu.RLock()
	defer s.mu.RUnlock()

	for _, user := range s.users {
		if user.Status == pb.UserStatus_USER_STATUS_ONLINE {
			resp := &pb.UserResponse{
				UserId:    user.ID,
				Username:  user.Username,
				Email:     user.Email,
				Status:    user.Status,
				CreatedAt: timestamppb.New(user.CreatedAt),
			}

			if err := stream.Send(resp); err != nil {
				return status.Errorf(codes.Internal, "failed to send user: %v", err)
			}
		}
	}

	return nil
}

// Helper functions

func (s *chatServer) storeMessage(msg *pb.ChatMessage) error {
	if msg.RoomId == "" {
		return errors.New("room_id is required")
	}

	message := &Message{
		ID:        msg.MessageId,
		UserID:    msg.UserId,
		Username:  msg.Username,
		RoomID:    msg.RoomId,
		Content:   msg.Content,
		Type:      msg.Type,
		Timestamp: msg.Timestamp.AsTime(),
	}

	s.messagesMu.Lock()
	s.messages[msg.RoomId] = append(s.messages[msg.RoomId], message)
	s.messagesMu.Unlock()

	return nil
}

func (s *chatServer) broadcast(msg *pb.ChatMessage) {
	s.mu.RLock()
	defer s.mu.RUnlock()

	for client, info := range s.clients {
		// 同じルームのクライアントにのみ送信
		if info.roomID == msg.RoomId {
			if err := client.Send(msg); err != nil {
				log.Printf("Failed to send to client %s: %v", info.username, err)
			}
		}
	}
}

// Interceptors

// loggingInterceptor logs all incoming requests
func loggingInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error) {
	start := time.Now()

	// メタデータの取得
	md, _ := metadata.FromIncomingContext(ctx)

	log.Printf("Request - Method: %s, Metadata: %v", info.FullMethod, md)

	// ハンドラーの実行
	resp, err := handler(ctx, req)

	// レイテンシの計算
	latency := time.Since(start)

	if err != nil {
		log.Printf("Response - Method: %s, Error: %v, Latency: %v", info.FullMethod, err, latency)
	} else {
		log.Printf("Response - Method: %s, Success, Latency: %v", info.FullMethod, latency)
	}

	return resp, err
}

// authInterceptor validates authentication tokens
func authInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error) {
	// 認証が不要なメソッドのリスト
	publicMethods := map[string]bool{
		"/chat.ChatService/RegisterUser": true,
	}

	if !publicMethods[info.FullMethod] {
		md, ok := metadata.FromIncomingContext(ctx)
		if !ok {
			return nil, status.Error(codes.Unauthenticated, "missing metadata")
		}

		tokens := md.Get("authorization")
		if len(tokens) == 0 {
			return nil, status.Error(codes.Unauthenticated, "missing authorization token")
		}

		// トークンの検証(実際にはJWT検証などを実装)
		token := tokens[0]
		if token != "valid-token" {
			return nil, status.Error(codes.Unauthenticated, "invalid token")
		}
	}

	return handler(ctx, req)
}

// metricsInterceptor collects metrics
func metricsInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error) {
	start := time.Now()

	resp, err := handler(ctx, req)

	latency := time.Since(start)

	// メトリクスの記録(実際にはPrometheusなどに送信)
	log.Printf("Metrics - Method: %s, Latency: %v, Success: %v",
		info.FullMethod, latency, err == nil)

	return resp, err
}

// Stream Interceptor for logging
func streamLoggingInterceptor(
	srv interface{},
	ss grpc.ServerStream,
	info *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) error {
	start := time.Now()

	log.Printf("Stream Request - Method: %s, IsClientStream: %v, IsServerStream: %v",
		info.FullMethod, info.IsClientStream, info.IsServerStream)

	err := handler(srv, ss)

	latency := time.Since(start)

	if err != nil {
		log.Printf("Stream Response - Method: %s, Error: %v, Duration: %v",
			info.FullMethod, err, latency)
	} else {
		log.Printf("Stream Response - Method: %s, Success, Duration: %v",
			info.FullMethod, latency)
	}

	return err
}

func main() {
	// TCPリスナーの作成
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}

	// Interceptorチェーンの設定
	s := grpc.NewServer(
		grpc.ChainUnaryInterceptor(
			loggingInterceptor,
			authInterceptor,
			metricsInterceptor,
		),
		grpc.ChainStreamInterceptor(
			streamLoggingInterceptor,
		),
	)

	// サービスの登録
	chatSrv := newChatServer()
	pb.RegisterChatServiceServer(s, chatSrv)

	log.Printf("Server listening on %v", lis.Addr())

	// サーバーの起動
	if err := s.Serve(lis); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

---

クライアント実装

client/main.go

package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"time"

	pb "chat/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
	"google.golang.org/protobuf/types/known/timestamppb"
)

func main() {
	// サーバーへの接続
	conn, err := grpc.Dial(
		"localhost:50051",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithDefaultServiceConfig(`{
			"loadBalancingPolicy": "round_robin",
			"methodConfig": [{
				"name": [{"service": "chat.ChatService"}],
				"retryPolicy": {
					"maxAttempts": 3,
					"initialBackoff": "0.1s",
					"maxBackoff": "1s",
					"backoffMultiplier": 2.0,
					"retryableStatusCodes": ["UNAVAILABLE"]
				}
			}]
		}`),
	)
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	// ユーザー登録
	fmt.Print("Enter username: ")
	scanner := bufio.NewScanner(os.Stdin)
	scanner.Scan()
	username := scanner.Text()

	fmt.Print("Enter email: ")
	scanner.Scan()
	email := scanner.Text()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	userResp, err := client.RegisterUser(ctx, &pb.RegisterUserRequest{
		Username: username,
		Email:    email,
	})
	if err != nil {
		log.Fatalf("Failed to register: %v", err)
	}

	fmt.Printf("Registered as %s (ID: %s)\n", userResp.Username, userResp.UserId)

	// メッセージ履歴の取得(Server Streaming)
	fmt.Print("Enter room ID: ")
	scanner.Scan()
	roomID := scanner.Text()

	fmt.Println("\n--- Message History ---")
	historyStream, err := client.GetMessageHistory(context.Background(), &pb.GetMessageHistoryRequest{
		RoomId: roomID,
		Limit:  10,
	})
	if err != nil {
		log.Printf("Failed to get history: %v", err)
	} else {
		for {
			msg, err := historyStream.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				log.Printf("Error receiving history: %v", err)
				break
			}
			fmt.Printf("[%s] %s: %s\n",
				msg.Timestamp.AsTime().Format("15:04:05"),
				msg.Username,
				msg.Content,
			)
		}
	}

	// チャット開始(Bidirectional Streaming)
	fmt.Println("\n--- Chat Started (type 'exit' to quit) ---")

	// 認証トークンを含むコンテキスト
	md := metadata.New(map[string]string{
		"authorization": "valid-token",
	})
	chatCtx := metadata.NewOutgoingContext(context.Background(), md)

	stream, err := client.Chat(chatCtx)
	if err != nil {
		log.Fatalf("Failed to start chat: %v", err)
	}

	// メッセージ受信のゴルーチン
	go func() {
		for {
			msg, err := stream.Recv()
			if err == io.EOF {
				return
			}
			if err != nil {
				log.Printf("Error receiving message: %v", err)
				return
			}

			// 自分のメッセージは表示しない
			if msg.UserId != userResp.UserId {
				fmt.Printf("\n[%s] %s: %s\n> ",
					msg.Timestamp.AsTime().Format("15:04:05"),
					msg.Username,
					msg.Content,
				)
			}
		}
	}()

	// メッセージ送信ループ
	for {
		fmt.Print("> ")
		scanner.Scan()
		text := scanner.Text()

		if text == "exit" {
			break
		}

		msg := &pb.ChatMessage{
			UserId:    userResp.UserId,
			Username:  userResp.Username,
			RoomId:    roomID,
			Content:   text,
			Type:      pb.MessageType_MESSAGE_TYPE_TEXT,
			Timestamp: timestamppb.Now(),
		}

		if err := stream.Send(msg); err != nil {
			log.Printf("Failed to send message: %v", err)
			break
		}
	}

	// ストリームのクローズ
	if err := stream.CloseSend(); err != nil {
		log.Printf("Failed to close stream: %v", err)
	}

	fmt.Println("Chat ended")
}

---

テストコード

server_test.go

package main

import (
	"context"
	"io"
	"net"
	"testing"
	"time"

	pb "chat/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/status"
	"google.golang.org/grpc/test/bufconn"
	"google.golang.org/protobuf/types/known/emptypb"
)

const bufSize = 1024 * 1024

var lis *bufconn.Listener

func init() {
	lis = bufconn.Listen(bufSize)
	s := grpc.NewServer()
	pb.RegisterChatServiceServer(s, newChatServer())
	go func() {
		if err := s.Serve(lis); err != nil {
			panic(err)
		}
	}()
}

func bufDialer(context.Context, string) (net.Conn, error) {
	return lis.Dial()
}

func TestRegisterUser(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	tests := []struct {
		name      string
		req       *pb.RegisterUserRequest
		wantErr   bool
		wantCode  codes.Code
	}{
		{
			name: "valid registration",
			req: &pb.RegisterUserRequest{
				Username: "alice",
				Email:    "alice@example.com",
			},
			wantErr: false,
		},
		{
			name: "missing username",
			req: &pb.RegisterUserRequest{
				Username: "",
				Email:    "alice@example.com",
			},
			wantErr:  true,
			wantCode: codes.InvalidArgument,
		},
		{
			name: "missing email",
			req: &pb.RegisterUserRequest{
				Username: "alice",
				Email:    "",
			},
			wantErr:  true,
			wantCode: codes.InvalidArgument,
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			resp, err := client.RegisterUser(ctx, tt.req)

			if tt.wantErr {
				if err == nil {
					t.Error("expected error, got nil")
				}
				st, ok := status.FromError(err)
				if !ok {
					t.Errorf("expected gRPC status error, got %v", err)
				}
				if st.Code() != tt.wantCode {
					t.Errorf("expected code %v, got %v", tt.wantCode, st.Code())
				}
			} else {
				if err != nil {
					t.Errorf("unexpected error: %v", err)
				}
				if resp.Username != tt.req.Username {
					t.Errorf("expected username %s, got %s", tt.req.Username, resp.Username)
				}
				if resp.UserId == "" {
					t.Error("expected non-empty user ID")
				}
			}
		})
	}
}

func TestGetUser(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	// ユーザー登録
	registerResp, err := client.RegisterUser(ctx, &pb.RegisterUserRequest{
		Username: "bob",
		Email:    "bob@example.com",
	})
	if err != nil {
		t.Fatalf("Failed to register user: %v", err)
	}

	// 登録したユーザーを取得
	getUserResp, err := client.GetUser(ctx, &pb.GetUserRequest{
		UserId: registerResp.UserId,
	})
	if err != nil {
		t.Fatalf("Failed to get user: %v", err)
	}

	if getUserResp.UserId != registerResp.UserId {
		t.Errorf("expected user ID %s, got %s", registerResp.UserId, getUserResp.UserId)
	}

	// 存在しないユーザーを取得
	_, err = client.GetUser(ctx, &pb.GetUserRequest{
		UserId: "non-existent-id",
	})
	if err == nil {
		t.Error("expected error for non-existent user")
	}
	st, _ := status.FromError(err)
	if st.Code() != codes.NotFound {
		t.Errorf("expected NotFound, got %v", st.Code())
	}
}

func TestGetMessageHistory(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	// メッセージ履歴を取得(空のルーム)
	stream, err := client.GetMessageHistory(ctx, &pb.GetMessageHistoryRequest{
		RoomId: "test-room",
		Limit:  10,
	})
	if err != nil {
		t.Fatalf("Failed to get message history: %v", err)
	}

	count := 0
	for {
		_, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			t.Fatalf("Error receiving message: %v", err)
		}
		count++
	}

	if count != 0 {
		t.Errorf("expected 0 messages in empty room, got %d", count)
	}
}

func TestSendBatchMessages(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	stream, err := client.SendBatchMessages(ctx)
	if err != nil {
		t.Fatalf("Failed to start batch send: %v", err)
	}

	// 複数のメッセージを送信
	messages := []*pb.ChatMessage{
		{
			MessageId: "msg1",
			UserId:    "user1",
			Username:  "Alice",
			RoomId:    "room1",
			Content:   "Hello",
			Type:      pb.MessageType_MESSAGE_TYPE_TEXT,
		},
		{
			MessageId: "msg2",
			UserId:    "user1",
			Username:  "Alice",
			RoomId:    "room1",
			Content:   "World",
			Type:      pb.MessageType_MESSAGE_TYPE_TEXT,
		},
	}

	for _, msg := range messages {
		if err := stream.Send(msg); err != nil {
			t.Fatalf("Failed to send message: %v", err)
		}
	}

	resp, err := stream.CloseAndRecv()
	if err != nil {
		t.Fatalf("Failed to close and receive: %v", err)
	}

	if resp.SentCount != int32(len(messages)) {
		t.Errorf("expected %d messages sent, got %d", len(messages), resp.SentCount)
	}
}

func TestListActiveUsers(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	// ユーザーを登録
	_, err = client.RegisterUser(ctx, &pb.RegisterUserRequest{
		Username: "charlie",
		Email:    "charlie@example.com",
	})
	if err != nil {
		t.Fatalf("Failed to register user: %v", err)
	}

	// アクティブユーザーをリスト
	stream, err := client.ListActiveUsers(ctx, &emptypb.Empty{})
	if err != nil {
		t.Fatalf("Failed to list users: %v", err)
	}

	count := 0
	for {
		user, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			t.Fatalf("Error receiving user: %v", err)
		}
		t.Logf("Active user: %s", user.Username)
		count++
	}

	if count == 0 {
		t.Error("expected at least one active user")
	}
}

func TestChatBidirectionalStreaming(t *testing.T) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		t.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	stream, err := client.Chat(ctx)
	if err != nil {
		t.Fatalf("Failed to start chat: %v", err)
	}

	// メッセージ送信
	testMsg := &pb.ChatMessage{
		UserId:   "test-user",
		Username: "TestUser",
		RoomId:   "test-room",
		Content:  "Test message",
		Type:     pb.MessageType_MESSAGE_TYPE_TEXT,
	}

	if err := stream.Send(testMsg); err != nil {
		t.Fatalf("Failed to send message: %v", err)
	}

	// 少し待機してブロードキャストを確認
	time.Sleep(100 * time.Millisecond)

	// ストリームをクローズ
	if err := stream.CloseSend(); err != nil {
		t.Fatalf("Failed to close send: %v", err)
	}
}

// ベンチマークテスト
func BenchmarkRegisterUser(b *testing.B) {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		b.Fatalf("Failed to dial bufnet: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, err := client.RegisterUser(ctx, &pb.RegisterUserRequest{
			Username: "benchuser",
			Email:    "bench@example.com",
		})
		if err != nil {
			b.Fatalf("Failed to register: %v", err)
		}
	}
}

---

エラーハンドリングとステータスコード

errors.go

package main

import (
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// カスタムエラータイプ
var (
	ErrUserNotFound      = status.Error(codes.NotFound, "user not found")
	ErrRoomNotFound      = status.Error(codes.NotFound, "room not found")
	ErrInvalidUsername   = status.Error(codes.InvalidArgument, "invalid username")
	ErrInvalidEmail      = status.Error(codes.InvalidArgument, "invalid email format")
	ErrUnauthorized      = status.Error(codes.Unauthenticated, "unauthorized")
	ErrPermissionDenied  = status.Error(codes.PermissionDenied, "permission denied")
	ErrRateLimitExceeded = status.Error(codes.ResourceExhausted, "rate limit exceeded")
)

// エラーをgRPCステータスに変換
func toGRPCError(err error) error {
	if err == nil {
		return nil
	}

	// すでにgRPCエラーの場合はそのまま返す
	if _, ok := status.FromError(err); ok {
		return err
	}

	// その他のエラーはInternalエラーとして返す
	return status.Errorf(codes.Internal, "internal error: %v", err)
}

---

まとめ

この解答例では、以下の内容を実装しました:

1. 完全なProtocol Buffers定義

  • 4種類のRPC通信パターン(Unary、Server Streaming、Client Streaming、Bidirectional Streaming)
  • 適切なメッセージ型とenum定義
  • タイムスタンプとemptyメッセージのインポート

2. サーバー実装

  • ユーザー管理とメッセージストレージ
  • 並行安全な実装(sync.RWMutex)
  • リアルタイムブロードキャスト機能

3. Interceptor実装

  • ロギング(リクエスト/レスポンスの記録)
  • 認証(トークン検証)
  • メトリクス収集(レイテンシ測定)
  • ストリーミング用Interceptor

4. クライアント実装

  • すべてのRPCパターンの使用例
  • リトライポリシーの設定
  • メタデータ(認証トークン)の送信

5. テストコード

  • 単体テスト(各RPCメソッド)
  • ストリーミングテスト
  • ベンチマークテスト
  • bufconn(インメモリテスト)の活用

6. エラーハンドリング

  • 適切なgRPCステータスコードの使用
  • カスタムエラー定義
  • エラー変換ユーティリティ

この実装は、本番環境で使用できる水準のコード品質を目指しており、実務で必要となる要素をすべて含んでいます。