Day 6: 高度な並行パターンとContext - 講義

今日の目標

Go言語の最も強力な機能の一つである並行処理をマスターするための最終段階に入ります。contextパッケージによる複雑な並行処理の制御、syncパッケージの高度な機能、そして実践的なエラーハンドリングパターンを学びます。

学習目標

  • contextパッケージの完全な理解と実践的な使用
  • sync.Poolやsync.Mapなどの高度な同期プリミティブ
  • エラーハンドリングの高度なパターン(エラーラッピング、カスタムエラー型)
  • リフレクションの基礎と適切な使用場面
  • プロダクションレベルの並行パターン
  • ---

    Part 1: Context パッケージの完全ガイド

    1.1 Contextの哲学と設計原則

    Goのcontextパッケージは、並行処理におけるキャンセルシグナル、タイムアウト、リクエストスコープ値の伝播を統一的に扱うために設計されました。2014年にGoogleのSameer Ajmaniによって提案され、Go 1.7で標準ライブラリに追加されました。

    なぜContextが必要なのか?

    // Context以前の問題あるコード
    func badExample() {
        done := make(chan bool)
        go func() {
            // 長時間実行される処理
            time.Sleep(10 * time.Second)
            done <- true
        }()
    
        // 途中でキャンセルする方法がない!
        // メモリリークの原因になる
    }
    
    // Contextを使った正しいコード
    func goodExample(ctx context.Context) error {
        done := make(chan bool)
        go func() {
            select {
            case <-time.After(10 * time.Second):
                done <- true
            case <-ctx.Done():
                return // クリーンに終了
            }
        }()
    
        select {
        case <-done:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    

    1.2 Context の種類と使い分け

    package main
    
    import (
        "context"
        "fmt"
        "time"
    )
    
    // 1. Background Context - ルートコンテキスト
    func demonstrateBackground() {
        // main関数や初期化処理で使用
        ctx := context.Background()
        fmt.Printf("Background context: %v\n", ctx)
    }
    
    // 2. TODO Context - 一時的なプレースホルダー
    func demonstrateTODO() {
        // どのcontextを使うべきか不明な場合の一時的な使用
        ctx := context.TODO()
        fmt.Printf("TODO context: %v\n", ctx)
    }
    
    // 3. WithCancel - 手動キャンセル
    func demonstrateWithCancel() {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel() // 必ずcancelを呼ぶ
    
        go func() {
            select {
            case <-time.After(5 * time.Second):
                fmt.Println("Operation completed")
            case <-ctx.Done():
                fmt.Println("Operation cancelled:", ctx.Err())
            }
        }()
    
        // 2秒後にキャンセル
        time.Sleep(2 * time.Second)
        cancel()
        time.Sleep(1 * time.Second)
    }
    
    // 4. WithTimeout - 自動タイムアウト
    func demonstrateWithTimeout() {
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()
    
        select {
        case <-time.After(3 * time.Second):
            fmt.Println("Operation completed")
        case <-ctx.Done():
            fmt.Println("Timeout:", ctx.Err())
        }
    }
    
    // 5. WithDeadline - 絶対時刻でのタイムアウト
    func demonstrateWithDeadline() {
        deadline := time.Now().Add(2 * time.Second)
        ctx, cancel := context.WithDeadline(context.Background(), deadline)
        defer cancel()
    
        select {
        case <-time.After(3 * time.Second):
            fmt.Println("Operation completed")
        case <-ctx.Done():
            fmt.Println("Deadline exceeded:", ctx.Err())
        }
    }
    
    // 6. WithValue - リクエストスコープ値の伝播
    type contextKey string
    
    const (
        userIDKey     contextKey = "userID"
        requestIDKey  contextKey = "requestID"
    )
    
    func demonstrateWithValue() {
        ctx := context.Background()
        ctx = context.WithValue(ctx, userIDKey, "user123")
        ctx = context.WithValue(ctx, requestIDKey, "req456")
    
        processRequest(ctx)
    }
    
    func processRequest(ctx context.Context) {
        userID := ctx.Value(userIDKey).(string)
        requestID := ctx.Value(requestIDKey).(string)
    
        fmt.Printf("Processing request %s for user %s\n", requestID, userID)
    }
    

    1.3 実践的なContextパターン

    パターン1: HTTPリクエストでのContext使用

    package main
    
    import (
        "context"
        "fmt"
        "net/http"
        "time"
    )
    
    // HTTPサーバーでのContext使用例
    func handleRequest(w http.ResponseWriter, r *http.Request) {
        // リクエストからcontextを取得
        ctx := r.Context()
    
        // タイムアウトを追加
        ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
        defer cancel()
    
        // データベースクエリを実行
        result, err := queryDatabase(ctx, "SELECT * FROM users")
        if err != nil {
            if err == context.DeadlineExceeded {
                http.Error(w, "Request timeout", http.StatusGatewayTimeout)
                return
            }
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    
        fmt.Fprintf(w, "Result: %v", result)
    }
    
    func queryDatabase(ctx context.Context, query string) (string, error) {
        // シミュレートされたデータベースクエリ
        resultChan := make(chan string)
        errChan := make(chan error)
    
        go func() {
            // 実際のクエリ処理
            time.Sleep(3 * time.Second)
            resultChan <- "query result"
        }()
    
        select {
        case result := <-resultChan:
            return result, nil
        case err := <-errChan:
            return "", err
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
    

    パターン2: 複数のゴルーチンへのキャンセル伝播

    package main
    
    import (
        "context"
        "fmt"
        "sync"
        "time"
    )
    
    func orchestrateMultipleTasks() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
    
        var wg sync.WaitGroup
    
        // 3つの並行タスクを起動
        tasks := []string{"task1", "task2", "task3"}
        for _, task := range tasks {
            wg.Add(1)
            go func(taskName string) {
                defer wg.Done()
                if err := performTask(ctx, taskName); err != nil {
                    fmt.Printf("%s failed: %v\n", taskName, err)
                    cancel() // 1つでも失敗したら全てキャンセル
                }
            }(task)
        }
    
        wg.Wait()
    }
    
    func performTask(ctx context.Context, name string) error {
        for i := 0; i < 10; i++ {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(500 * time.Millisecond):
                fmt.Printf("%s: step %d\n", name, i)
            }
        }
        return nil
    }
    

    パターン3: Context継承チェーン

    package main
    
    import (
        "context"
        "fmt"
        "time"
    )
    
    type service struct {
        name string
    }
    
    // サービス層でのContext使用
    func (s *service) HandleOperation(ctx context.Context, userID string) error {
        // リクエストIDを追加
        ctx = context.WithValue(ctx, requestIDKey, generateRequestID())
    
        // ユーザーIDを追加
        ctx = context.WithValue(ctx, userIDKey, userID)
    
        // タイムアウトを設定(親のタイムアウトより短い)
        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
        defer cancel()
    
        // ビジネスロジックを実行
        return s.executeBusinessLogic(ctx)
    }
    
    func (s *service) executeBusinessLogic(ctx context.Context) error {
        // データベース操作
        if err := s.fetchUserData(ctx); err != nil {
            return err
        }
    
        // 外部API呼び出し
        if err := s.callExternalAPI(ctx); err != nil {
            return err
        }
    
        return nil
    }
    
    func (s *service) fetchUserData(ctx context.Context) error {
        // contextから値を取得
        userID := ctx.Value(userIDKey)
        requestID := ctx.Value(requestIDKey)
    
        fmt.Printf("[%v] Fetching data for user %v\n", requestID, userID)
    
        // データベースクエリをシミュレート
        select {
        case <-time.After(1 * time.Second):
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    
    func (s *service) callExternalAPI(ctx context.Context) error {
        requestID := ctx.Value(requestIDKey)
        fmt.Printf("[%v] Calling external API\n", requestID)
    
        select {
        case <-time.After(1 * time.Second):
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    
    func generateRequestID() string {
        return fmt.Sprintf("req-%d", time.Now().UnixNano())
    }
    

    ---

    Part 2: Sync パッケージの高度な機能

    2.1 sync.Pool - オブジェクトプーリング

    sync.Poolは一時的なオブジェクトの再利用を可能にし、GCの圧力を減らします。

    package main
    
    import (
        "bytes"
        "fmt"
        "sync"
    )
    
    // バッファプールの実装
    var bufferPool = sync.Pool{
        New: func() interface{} {
            // 新しいバッファを作成
            return new(bytes.Buffer)
        },
    }
    
    func processData(data string) string {
        // プールからバッファを取得
        buf := bufferPool.Get().(*bytes.Buffer)
    
        // 使用後はリセットして返却
        defer func() {
            buf.Reset()
            bufferPool.Put(buf)
        }()
    
        // バッファを使用
        buf.WriteString("Processed: ")
        buf.WriteString(data)
    
        return buf.String()
    }
    
    // 実践例: HTTPレスポンスバッファ
    type ResponseWriter struct {
        pool *sync.Pool
    }
    
    func NewResponseWriter() *ResponseWriter {
        return &ResponseWriter{
            pool: &sync.Pool{
                New: func() interface{} {
                    return bytes.NewBuffer(make([]byte, 0, 1024))
                },
            },
        }
    }
    
    func (rw *ResponseWriter) WriteResponse(data []byte) []byte {
        buf := rw.pool.Get().(*bytes.Buffer)
        defer rw.pool.Put(buf)
    
        buf.Reset()
        buf.WriteString("HTTP/1.1 200 OK\r\n")
        buf.WriteString("Content-Type: application/json\r\n\r\n")
        buf.Write(data)
    
        result := make([]byte, buf.Len())
        copy(result, buf.Bytes())
        return result
    }
    

    2.2 sync.Map - 並行安全なマップ

    標準のmapはスレッドセーフではありませんが、sync.Mapは並行読み書きに最適化されています。

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // キャッシュの実装
    type Cache struct {
        data sync.Map
    }
    
    func NewCache() *Cache {
        return &Cache{}
    }
    
    func (c *Cache) Set(key string, value interface{}) {
        c.data.Store(key, value)
    }
    
    func (c *Cache) Get(key string) (interface{}, bool) {
        return c.data.Load(key)
    }
    
    func (c *Cache) Delete(key string) {
        c.data.Delete(key)
    }
    
    func (c *Cache) Range(f func(key, value interface{}) bool) {
        c.data.Range(f)
    }
    
    // 実践例: セッションストア
    type SessionStore struct {
        sessions sync.Map
    }
    
    type Session struct {
        UserID    string
        CreatedAt int64
        Data      map[string]interface{}
    }
    
    func (s *SessionStore) CreateSession(sessionID, userID string) {
        session := &Session{
            UserID:    userID,
            CreatedAt: time.Now().Unix(),
            Data:      make(map[string]interface{}),
        }
        s.sessions.Store(sessionID, session)
    }
    
    func (s *SessionStore) GetSession(sessionID string) (*Session, bool) {
        if val, ok := s.sessions.Load(sessionID); ok {
            return val.(*Session), true
        }
        return nil, false
    }
    
    func (s *SessionStore) DeleteSession(sessionID string) {
        s.sessions.Delete(sessionID)
    }
    
    // アクティブセッション数を取得
    func (s *SessionStore) ActiveSessions() int {
        count := 0
        s.sessions.Range(func(key, value interface{}) bool {
            count++
            return true
        })
        return count
    }
    

    2.3 sync.Once - 一度だけの実行保証

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // シングルトンパターンの実装
    type Database struct {
        connection string
    }
    
    var (
        instance *Database
        once     sync.Once
    )
    
    func GetDatabaseInstance() *Database {
        once.Do(func() {
            fmt.Println("Initializing database connection...")
            instance = &Database{
                connection: "postgres://localhost:5432",
            }
        })
        return instance
    }
    
    // 設定の遅延初期化
    type Config struct {
        once   sync.Once
        values map[string]string
    }
    
    func (c *Config) Load() {
        c.once.Do(func() {
            fmt.Println("Loading configuration...")
            c.values = map[string]string{
                "host": "localhost",
                "port": "8080",
            }
        })
    }
    
    func (c *Config) Get(key string) string {
        c.Load()
        return c.values[key]
    }
    

    2.4 sync.Cond - 条件変数

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    // 生産者・消費者パターン
    type Queue struct {
        mu    sync.Mutex
        cond  *sync.Cond
        items []int
    }
    
    func NewQueue() *Queue {
        q := &Queue{
            items: make([]int, 0),
        }
        q.cond = sync.NewCond(&q.mu)
        return q
    }
    
    func (q *Queue) Enqueue(item int) {
        q.mu.Lock()
        defer q.mu.Unlock()
    
        q.items = append(q.items, item)
        q.cond.Signal() // 待機中の1つのゴルーチンを起こす
    }
    
    func (q *Queue) Dequeue() int {
        q.mu.Lock()
        defer q.mu.Unlock()
    
        // キューが空の間待機
        for len(q.items) == 0 {
            q.cond.Wait()
        }
    
        item := q.items[0]
        q.items = q.items[1:]
        return item
    }
    
    // 使用例
    func demonstrateQueue() {
        q := NewQueue()
    
        // 消費者
        go func() {
            for i := 0; i < 5; i++ {
                item := q.Dequeue()
                fmt.Printf("Consumed: %d\n", item)
                time.Sleep(1 * time.Second)
            }
        }()
    
        // 生産者
        for i := 0; i < 5; i++ {
            q.Enqueue(i)
            fmt.Printf("Produced: %d\n", i)
            time.Sleep(500 * time.Millisecond)
        }
    }
    

    ---

    Part 3: 高度なエラーハンドリング

    3.1 エラーラッピングとアンラッピング

    package main
    
    import (
        "errors"
        "fmt"
    )
    
    // カスタムエラー型
    type ValidationError struct {
        Field   string
        Message string
    }
    
    func (e *ValidationError) Error() string {
        return fmt.Sprintf("validation error on field %s: %s", e.Field, e.Message)
    }
    
    // エラーラッピング
    func readConfig(path string) error {
        if err := validatePath(path); err != nil {
            return fmt.Errorf("read config: %w", err)
        }
    
        if err := parseConfig(path); err != nil {
            return fmt.Errorf("read config: parse failed: %w", err)
        }
    
        return nil
    }
    
    func validatePath(path string) error {
        if path == "" {
            return &ValidationError{
                Field:   "path",
                Message: "cannot be empty",
            }
        }
        return nil
    }
    
    func parseConfig(path string) error {
        // パース処理
        return nil
    }
    
    // エラーチェック
    func handleConfigError() {
        err := readConfig("")
        if err != nil {
            // 特定のエラー型をチェック
            var validationErr *ValidationError
            if errors.As(err, &validationErr) {
                fmt.Printf("Validation failed: %s\n", validationErr.Field)
            }
    
            // エラーが特定のエラーを含むかチェック
            if errors.Is(err, os.ErrNotExist) {
                fmt.Println("Config file not found")
            }
    
            fmt.Printf("Error: %v\n", err)
        }
    }
    

    3.2 センチネルエラーとカスタムエラー

    package main
    
    import (
        "errors"
        "fmt"
    )
    
    // センチネルエラー(事前定義されたエラー)
    var (
        ErrNotFound      = errors.New("not found")
        ErrUnauthorized  = errors.New("unauthorized")
        ErrInvalidInput  = errors.New("invalid input")
    )
    
    // リッチなエラー情報を持つカスタムエラー
    type DatabaseError struct {
        Operation string
        Table     string
        Err       error
    }
    
    func (e *DatabaseError) Error() string {
        return fmt.Sprintf("database error during %s on table %s: %v",
            e.Operation, e.Table, e.Err)
    }
    
    func (e *DatabaseError) Unwrap() error {
        return e.Err
    }
    
    // 使用例
    func fetchUser(id int) error {
        if id <= 0 {
            return ErrInvalidInput
        }
    
        // データベース操作
        if err := queryUser(id); err != nil {
            return &DatabaseError{
                Operation: "SELECT",
                Table:     "users",
                Err:       err,
            }
        }
    
        return nil
    }
    
    func queryUser(id int) error {
        // シミュレート
        return ErrNotFound
    }
    

    3.3 エラーコレクション

    package main
    
    import (
        "fmt"
        "strings"
    )
    
    // 複数のエラーを集約
    type MultiError struct {
        Errors []error
    }
    
    func (m *MultiError) Error() string {
        if len(m.Errors) == 0 {
            return "no errors"
        }
    
        var messages []string
        for _, err := range m.Errors {
            messages = append(messages, err.Error())
        }
    
        return fmt.Sprintf("multiple errors: %s", strings.Join(messages, "; "))
    }
    
    func (m *MultiError) Add(err error) {
        if err != nil {
            m.Errors = append(m.Errors, err)
        }
    }
    
    func (m *MultiError) HasErrors() bool {
        return len(m.Errors) > 0
    }
    
    // 使用例: バリデーション
    func validateUser(user User) error {
        errs := &MultiError{}
    
        if user.Name == "" {
            errs.Add(fmt.Errorf("name is required"))
        }
    
        if user.Email == "" {
            errs.Add(fmt.Errorf("email is required"))
        }
    
        if user.Age < 0 || user.Age > 150 {
            errs.Add(fmt.Errorf("age must be between 0 and 150"))
        }
    
        if errs.HasErrors() {
            return errs
        }
    
        return nil
    }
    

    ---

    Part 4: リフレクションの基礎

    4.1 リフレクションの基本

    package main
    
    import (
        "fmt"
        "reflect"
    )
    
    func inspectValue(v interface{}) {
        // 型情報を取得
        t := reflect.TypeOf(v)
        fmt.Printf("Type: %v\n", t)
        fmt.Printf("Kind: %v\n", t.Kind())
    
        // 値情報を取得
        val := reflect.ValueOf(v)
        fmt.Printf("Value: %v\n", val)
        fmt.Printf("Is valid: %v\n", val.IsValid())
        fmt.Printf("Is zero: %v\n", val.IsZero())
    }
    
    // 構造体のフィールドを調査
    func inspectStruct(v interface{}) {
        t := reflect.TypeOf(v)
        val := reflect.ValueOf(v)
    
        if t.Kind() != reflect.Struct {
            fmt.Println("Not a struct")
            return
        }
    
        fmt.Printf("Struct: %s\n", t.Name())
        fmt.Printf("Number of fields: %d\n", t.NumField())
    
        for i := 0; i < t.NumField(); i++ {
            field := t.Field(i)
            value := val.Field(i)
    
            fmt.Printf("  Field %d: %s %s = %v\n",
                i, field.Name, field.Type, value.Interface())
    
            // タグを取得
            if tag := field.Tag.Get("json"); tag != "" {
                fmt.Printf("    JSON tag: %s\n", tag)
            }
        }
    }
    

    4.2 実践的なリフレクション使用例

    package main
    
    import (
        "fmt"
        "reflect"
    )
    
    // JSONエンコーダー(簡易版)
    func simpleJSONEncode(v interface{}) string {
        val := reflect.ValueOf(v)
        t := val.Type()
    
        if t.Kind() != reflect.Struct {
            return fmt.Sprintf("%v", v)
        }
    
        result := "{"
    
        for i := 0; i < val.NumField(); i++ {
            if i > 0 {
                result += ", "
            }
    
            field := t.Field(i)
            value := val.Field(i)
    
            jsonTag := field.Tag.Get("json")
            if jsonTag == "" {
                jsonTag = field.Name
            }
    
            result += fmt.Sprintf(`"%s": `, jsonTag)
    
            switch value.Kind() {
            case reflect.String:
                result += fmt.Sprintf(`"%s"`, value.String())
            case reflect.Int, reflect.Int64:
                result += fmt.Sprintf("%d", value.Int())
            case reflect.Bool:
                result += fmt.Sprintf("%t", value.Bool())
            default:
                result += fmt.Sprintf("%v", value.Interface())
            }
        }
    
        result += "}"
        return result
    }
    
    // 構造体のコピー
    func copyStruct(src, dst interface{}) error {
        srcVal := reflect.ValueOf(src)
        dstVal := reflect.ValueOf(dst)
    
        if srcVal.Kind() != reflect.Ptr || dstVal.Kind() != reflect.Ptr {
            return fmt.Errorf("both src and dst must be pointers")
        }
    
        srcVal = srcVal.Elem()
        dstVal = dstVal.Elem()
    
        if srcVal.Type() != dstVal.Type() {
            return fmt.Errorf("src and dst must be the same type")
        }
    
        for i := 0; i < srcVal.NumField(); i++ {
            dstVal.Field(i).Set(srcVal.Field(i))
        }
    
        return nil
    }
    

    ---

    Part 5: プロダクショングレードの並行パターン

    5.1 ワーカープールパターン

    package main
    
    import (
        "context"
        "fmt"
        "sync"
        "time"
    )
    
    type Job struct {
        ID      int
        Payload string
    }
    
    type Result struct {
        Job    Job
        Output string
        Error  error
    }
    
    type WorkerPool struct {
        workerCount int
        jobs        chan Job
        results     chan Result
        ctx         context.Context
        cancel      context.CancelFunc
        wg          sync.WaitGroup
    }
    
    func NewWorkerPool(workerCount int) *WorkerPool {
        ctx, cancel := context.WithCancel(context.Background())
    
        return &WorkerPool{
            workerCount: workerCount,
            jobs:        make(chan Job, 100),
            results:     make(chan Result, 100),
            ctx:         ctx,
            cancel:      cancel,
        }
    }
    
    func (wp *WorkerPool) Start() {
        for i := 0; i < wp.workerCount; i++ {
            wp.wg.Add(1)
            go wp.worker(i)
        }
    }
    
    func (wp *WorkerPool) worker(id int) {
        defer wp.wg.Done()
    
        fmt.Printf("Worker %d started\n", id)
    
        for {
            select {
            case job, ok := <-wp.jobs:
                if !ok {
                    fmt.Printf("Worker %d finished\n", id)
                    return
                }
    
                // ジョブを処理
                result := wp.processJob(job)
    
                select {
                case wp.results <- result:
                case <-wp.ctx.Done():
                    return
                }
    
            case <-wp.ctx.Done():
                fmt.Printf("Worker %d cancelled\n", id)
                return
            }
        }
    }
    
    func (wp *WorkerPool) processJob(job Job) Result {
        // シミュレートされた処理
        time.Sleep(100 * time.Millisecond)
    
        return Result{
            Job:    job,
            Output: fmt.Sprintf("Processed: %s", job.Payload),
            Error:  nil,
        }
    }
    
    func (wp *WorkerPool) Submit(job Job) {
        wp.jobs <- job
    }
    
    func (wp *WorkerPool) Results() <-chan Result {
        return wp.results
    }
    
    func (wp *WorkerPool) Shutdown() {
        close(wp.jobs)
        wp.wg.Wait()
        close(wp.results)
    }
    
    // 使用例
    func demonstrateWorkerPool() {
        pool := NewWorkerPool(5)
        pool.Start()
    
        // 結果を収集
        go func() {
            for result := range pool.Results() {
                if result.Error != nil {
                    fmt.Printf("Job %d failed: %v\n", result.Job.ID, result.Error)
                } else {
                    fmt.Printf("Job %d completed: %s\n", result.Job.ID, result.Output)
                }
            }
        }()
    
        // ジョブを投入
        for i := 0; i < 20; i++ {
            pool.Submit(Job{
                ID:      i,
                Payload: fmt.Sprintf("data-%d", i),
            })
        }
    
        pool.Shutdown()
    }
    

    5.2 パイプラインパターン(高度版)

    package main
    
    import (
        "context"
        "fmt"
    )
    
    type Pipeline struct {
        stages []Stage
    }
    
    type Stage func(context.Context, <-chan interface{}) <-chan interface{}
    
    func NewPipeline(stages ...Stage) *Pipeline {
        return &Pipeline{stages: stages}
    }
    
    func (p *Pipeline) Execute(ctx context.Context, input <-chan interface{}) <-chan interface{} {
        out := input
    
        for _, stage := range p.stages {
            out = stage(ctx, out)
        }
    
        return out
    }
    
    // ステージ例: データ検証
    func validateStage(ctx context.Context, in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
    
        go func() {
            defer close(out)
    
            for {
                select {
                case data, ok := <-in:
                    if !ok {
                        return
                    }
    
                    // 検証ロジック
                    if str, ok := data.(string); ok && str != "" {
                        out <- data
                    }
    
                case <-ctx.Done():
                    return
                }
            }
        }()
    
        return out
    }
    
    // ステージ例: データ変換
    func transformStage(ctx context.Context, in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
    
        go func() {
            defer close(out)
    
            for {
                select {
                case data, ok := <-in:
                    if !ok {
                        return
                    }
    
                    // 変換ロジック
                    if str, ok := data.(string); ok {
                        out <- fmt.Sprintf("Transformed: %s", str)
                    }
    
                case <-ctx.Done():
                    return
                }
            }
        }()
    
        return out
    }
    
    // ステージ例: データエンリッチメント
    func enrichStage(ctx context.Context, in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
    
        go func() {
            defer close(out)
    
            for {
                select {
                case data, ok := <-in:
                    if !ok {
                        return
                    }
    
                    // エンリッチメントロジック
                    if str, ok := data.(string); ok {
                        out <- fmt.Sprintf("%s [enriched]", str)
                    }
    
                case <-ctx.Done():
                    return
                }
            }
        }()
    
        return out
    }
    

    ---

    Part 6: 実世界での適用例

    6.1 Kubernetesでのcontextパターン

    Kubernetesのクライアントライブラリは、contextを徹底的に使用しています:

    // Kubernetes APIクライアントでの使用例(概念的)
    func listPods(ctx context.Context, namespace string) error {
        // タイムアウト付きのコンテキスト
        ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
        defer cancel()
    
        // Kubernetes APIを呼び出し
        pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
        if err != nil {
            return fmt.Errorf("failed to list pods: %w", err)
        }
    
        for _, pod := range pods.Items {
            fmt.Printf("Pod: %s\n", pod.Name)
        }
    
        return nil
    }
    

    6.2 Prometheusでのsync.Pool使用

    Prometheusは、メトリクス収集時にsync.Poolを使用してGC圧力を軽減しています:

    // Prometheusスタイルのメトリクス収集(概念的)
    var metricPool = sync.Pool{
        New: func() interface{} {
            return &Metric{
                Labels: make(map[string]string),
            }
        },
    }
    
    type Metric struct {
        Name   string
        Value  float64
        Labels map[string]string
    }
    
    func collectMetric(name string, value float64, labels map[string]string) {
        metric := metricPool.Get().(*Metric)
        defer metricPool.Put(metric)
    
        metric.Name = name
        metric.Value = value
        for k, v := range labels {
            metric.Labels[k] = v
        }
    
        // メトリクスを処理
        processMetric(metric)
    
        // クリーンアップ
        metric.Name = ""
        metric.Value = 0
        for k := range metric.Labels {
            delete(metric.Labels, k)
        }
    }
    

    ---

    まとめ

    Day 6では、Goの最も強力な機能である並行処理の高度なパターンを学びました:

  • Context: 並行処理の制御、タイムアウト、キャンセル伝播
  • Sync高度機能: Pool、Map、Once、Condによる効率的な同期
  • エラーハンドリング: ラッピング、カスタムエラー、エラーコレクション
  • リフレクション: 動的な型操作と適切な使用場面
  • プロダクションパターン: ワーカープール、パイプライン

これらのパターンは、大規模システムで実際に使用されており、プロダクションレベルのGoコードを書くための基礎となります。

次のステップ

Day 7では、これまでの知識を統合して実践的なプロジェクトを構築します。HTTPサーバー、データベース統合、テストを含む完全なアプリケーションを作成します。