Day 5: チャネル - 解説

チャネルパターンの深い理解

なぜチャネルなのか?

従来の並行処理(共有メモリ + ロック)の問題点:

// 悪い例: 共有メモリ + ミューテックス
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++  // クリティカルセクション
    c.mu.Unlock()
}

問題点:

  • デッドロックのリスク(ロックの取得順序のミス)
  • レースコンディション(ロック忘れ)
  • コードの複雑化(ロックの粒度調整)
  • パフォーマンス低下(過度なロック)

チャネルを使った解決:

// 良い例: チャネルによる通信
type Counter struct {
    ch chan int
}

func NewCounter() *Counter {
    c := &Counter{ch: make(chan int)}
    go c.run()
    return c
}

func (c *Counter) run() {
    count := 0
    for range c.ch {
        count++
        fmt.Println("Count:", count)
    }
}

func (c *Counter) Increment() {
    c.ch <- 1
}

利点:

  • ロック不要
  • デッドロックのリスクが低い
  • 読みやすいコード
  • 自然な並行処理表現
  • ---

    チャネルの内部実装

    チャネルの構造(runtime/chan.go)

    Goランタイムのチャネルは以下のような構造体で実装されています:

    type hchan struct {
        qcount   uint           // バッファ内のデータ数
        dataqsiz uint           // バッファサイズ
        buf      unsafe.Pointer // バッファへのポインタ
        elemsize uint16         // 要素のサイズ
        closed   uint32         // クローズフラグ
        sendx    uint           // 送信インデックス
        recvx    uint           // 受信インデックス
        recvq    waitq          // 受信待ちゴルーチンのキュー
        sendq    waitq          // 送信待ちゴルーチンのキュー
        lock     mutex          // 内部ロック
    }
    

    送受信の動作フロー

    送信 (ch <- value) の内部動作:

  • 受信待ちゴルーチンがある場合:
- recvq から1つ取り出す - 直接値を渡す(コピー不要) - 受信側ゴルーチンを起床

  • バッファに空きがある場合:
- バッファに値を書き込む - sendx をインクリメント - すぐにリターン

  • バッファが満杯の場合:
- 現在のゴルーチンを sendq に追加 - ゴルーチンをブロック(スケジューラに戻る)

受信 (value := <-ch) の内部動作:

  • 送信待ちゴルーチンがある場合:
- sendq から1つ取り出す - 値を受け取る - 送信側ゴルーチンを起床

  • バッファにデータがある場合:
- バッファから値を読み取る - recvx をインクリメント - すぐにリターン

  • バッファが空の場合:
- 現在のゴルーチンを recvq に追加 - ゴルーチンをブロック

---

デバッグ技法

1. Race Detector の活用

go run -race main.go
go test -race ./...
go build -race

実例:データ競合の検出

package main

import (
    "fmt"
    "time"
)

var counter int  // 共有変数

func increment() {
    for i := 0; i < 1000; i++ {
        counter++  // データ競合!
    }
}

func main() {
    go increment()
    go increment()
    time.Sleep(time.Second)
    fmt.Println("Counter:", counter)
}

Race Detector の出力:

==================
WARNING: DATA RACE
Write at 0x00c0000b4010 by goroutine 7:
  main.increment()
      /path/to/main.go:10 +0x3e

Previous write at 0x00c0000b4010 by goroutine 6:
  main.increment()
      /path/to/main.go:10 +0x3e

Goroutine 7 (running) created at:
  main.main()
      /path/to/main.go:15 +0x5a

Goroutine 6 (running) created at:
  main.main()
      /path/to/main.go:14 +0x42
==================
Found 1 data race(s)

修正版(チャネル使用):

package main

import "fmt"

func increment(ch chan<- int) {
    for i := 0; i < 1000; i++ {
        ch <- 1
    }
    close(ch)
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go increment(ch1)
    go increment(ch2)

    counter := 0
    for {
        select {
        case _, ok := <-ch1:
            if ok {
                counter++
            } else {
                ch1 = nil
            }
        case _, ok := <-ch2:
            if ok {
                counter++
            } else {
                ch2 = nil
            }
        }
        if ch1 == nil && ch2 == nil {
            break
        }
    }

    fmt.Println("Counter:", counter)  // 2000(正確)
}

2. デッドロック検出

Goランタイムの自動検出:

package main

func main() {
    ch := make(chan int)
    ch <- 42  // デッドロック!
}

出力:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /path/to/main.go:5 +0x37

デッドロックのパターン:

  • 循環待機:
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    ch1 <- <-ch2  // ch2 を待つ
}()

go func() {
    ch2 <- <-ch1  // ch1 を待つ
}()

  • 送信者不在:
ch := make(chan int)
val := <-ch  // 永遠に待つ

  • 受信者不在:
ch := make(chan int)
ch <- 42  // 永遠にブロック

3. pprof によるゴルーチンリーク検出

package main

import (
    "net/http"
    _ "net/http/pprof"
    "time"
)

func leakyWorker() {
    ch := make(chan int)
    <-ch  // 永遠にブロック(リーク!)
}

func main() {
    // pprof サーバー起動
    go func() {
        http.ListenAndServe("localhost:6060", nil)
    }()

    // リークするゴルーチンを大量作成
    for i := 0; i < 1000; i++ {
        go leakyWorker()
    }

    time.Sleep(10 * time.Minute)
}

ゴルーチン数の確認:

# ブラウザで http://localhost:6060/debug/pprof/goroutine にアクセス
# または
curl http://localhost:6060/debug/pprof/goroutine?debug=1

出力例:

goroutine profile: total 1001
1000 @ 0x... 0x... 0x...
#   0x...   main.leakyWorker+0x2a   /path/to/main.go:9

1 @ 0x... 0x... 0x...
#   0x...   main.main+0x12b         /path/to/main.go:20

---

パフォーマンス最適化

1. バッファサイズのチューニング

ベンチマーク例:

package main

import (
    "testing"
)

func BenchmarkUnbuffered(b *testing.B) {
    ch := make(chan int)
    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

func BenchmarkBuffered10(b *testing.B) {
    ch := make(chan int, 10)
    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

func BenchmarkBuffered100(b *testing.B) {
    ch := make(chan int, 100)
    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

実行:

go test -bench=. -benchmem

結果例:

BenchmarkUnbuffered-8     5000000   250 ns/op   0 B/op   0 allocs/op
BenchmarkBuffered10-8    10000000   180 ns/op   0 B/op   0 allocs/op
BenchmarkBuffered100-8   10000000   170 ns/op   0 B/op   0 allocs/op

考察:

  • バッファサイズが大きいほど速い
  • しかし、メモリ使用量とのトレードオフ
  • 最適なサイズはワークロードに依存

2. select の最適化

非効率な例:

for {
    select {
    case v := <-ch1:
        process(v)
    case v := <-ch2:
        process(v)
    case v := <-ch3:
        process(v)
    default:
        time.Sleep(time.Millisecond)  // ビジーウェイト
    }
}

最適化版:

// default 句を削除してブロッキング
for {
    select {
    case v := <-ch1:
        process(v)
    case v := <-ch2:
        process(v)
    case v := <-ch3:
        process(v)
    }
}

---

実世界のインシデント事例

Case 1: Uber のゴルーチンリーク(2018年)

問題:

  • マイクロサービス間通信でゴルーチンがリーク
  • メモリ使用量が徐々に増加
  • 数時間でOOM Killer が発動

原因:

// 悪いコード
func fetchData(ctx context.Context, url string) (string, error) {
    ch := make(chan string)
    go func() {
        // HTTP リクエスト(タイムアウトなし)
        data := slowFetch(url)
        ch <- data  // ctx がキャンセルされても送信しようとする
    }()

    select {
    case data := <-ch:
        return data, nil
    case <-ctx.Done():
        return "", ctx.Err()  // ゴルーチンは残る!
    }
}

修正:

func fetchData(ctx context.Context, url string) (string, error) {
    ch := make(chan string, 1)  // バッファ付きに変更
    go func() {
        data := slowFetch(url)
        select {
        case ch <- data:
        case <-ctx.Done():  // キャンセル検出
            return
        }
    }()

    select {
    case data := <-ch:
        return data, nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

Case 2: Cloudflare のチャネルデッドロック(2019年)

問題:

  • DNS リゾルバーでデッドロック発生
  • 全世界のサービスが30分停止

原因:

// 簡略化した再現コード
func resolver(queries <-chan Query, results chan<- Result) {
    for q := range queries {
        results <- resolve(q)  // results が満杯だとブロック
    }
}

func main() {
    queries := make(chan Query, 100)
    results := make(chan Result)  // バッファなし!

    go resolver(queries, results)

    for i := 0; i < 200; i++ {
        queries <- Query{ID: i}
    }
    // results から読み取っていない → デッドロック
}

修正:

func main() {
    queries := make(chan Query, 100)
    results := make(chan Result, 100)  // バッファ追加

    go resolver(queries, results)

    for i := 0; i < 200; i++ {
        queries <- Query{ID: i}
    }

    for i := 0; i < 200; i++ {
        result := <-results
        fmt.Println(result)
    }
}

---

ベストプラクティス総まとめ

1. チャネルの所有権

原則:チャネルを作成した側がクローズする

// 良い例
func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)  // producer がクローズ
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()
    return ch
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println(v)
    }
    // クローズしない
}

2. チャネルの方向性

// 送信専用
func send(ch chan<- int) {
    ch <- 42
}

// 受信専用
func receive(ch <-chan int) {
    val := <-ch
}

3. nil チャネルの活用

func merge(ch1, ch2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for ch1 != nil || ch2 != nil {
            select {
            case v, ok := <-ch1:
                if !ok {
                    ch1 = nil  // クローズされたらnilに
                    continue
                }
                out <- v
            case v, ok := <-ch2:
                if !ok {
                    ch2 = nil
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

4. タイムアウトの実装

// 悪い例
time.Sleep(5 * time.Second)

// 良い例
select {
case result := <-ch:
    return result
case <-time.After(5 * time.Second):
    return ErrTimeout
}

// さらに良い例(context使用)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case result := <-ch:
    return result
case <-ctx.Done():
    return ctx.Err()
}

5. エラー伝播

type Result struct {
    Value int
    Err   error
}

func process(in <-chan int) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for v := range in {
            if v < 0 {
                out <- Result{Err: errors.New("負の値")}
                continue
            }
            out <- Result{Value: v * 2}
        }
    }()
    return out
}

---

よくある質問(FAQ)

Q1: バッファサイズはどう決めるべき?

A: 以下の指針に従ってください:

  • デフォルトは0(アンバッファド) - 同期が必要な場合
  • 1 - 最新の値のみが重要な場合(例: UI更新)
  • ワーカー数と同じ - Worker Poolパターン
  • プロファイリング結果に基づく - 本番環境の負荷テスト後に調整
  • Q2: close(ch) は必須?

    A: 以下の場合は必須です:

  • range でチャネルを読む場合
  • 受信側が終了条件を知る必要がある場合

必須でない場合:

  • プログラム終了まで使い続ける場合
  • ガベージコレクタが回収する

Q3: クローズされたチャネルに送信したらどうなる?

A: パニックが発生します。

ch := make(chan int)
close(ch)
ch <- 42  // panic: send on closed channel

防止策:

// 送信側でのみクローズ
// 受信側は決してクローズしない

Q4: select で全てのケースが準備完了だったら?

A: ランダムに1つが選択されます(公平性を保つため)。

ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 1
ch2 <- 2

select {
case v := <-ch1:
    fmt.Println("ch1:", v)
case v := <-ch2:
    fmt.Println("ch2:", v)
}
// 出力はランダム

Q5: チャネル vs ミューテックス、どちらを使うべき?

A:

用途 推奨
**データ転送** チャネル
**状態共有** ミューテックス
**所有権の移転** チャネル
**キャッシュ** ミューテックス
**イベント通知** チャネル
**カウンター** atomic またはミューテックス

---

学習チェックリスト

Day 5 を完了するために、以下を確認してください:

  • [ ] アンバッファド vs バッファ付きチャネルの違いを説明できる
  • [ ] select 文でタイムアウトを実装できる
  • [ ] Pipeline パターンを実装できる
  • [ ] Fan-out/Fan-in パターンを実装できる
  • [ ] Worker Pool を実装できる
  • [ ] デッドロックを検出・修正できる
  • [ ] ゴルーチンリークを防げる
  • [ ] Race Detector を使える
  • [ ] context によるキャンセルを実装できる
  • [ ] 実世界の事例から学んだ教訓を理解している
  • ---

    次のステップ

    Day 6 では以下を学びます:

  • context パッケージの詳細
- WithCancel, WithDeadline, WithTimeout, WithValue - context の伝播パターン - 本番環境でのcontext活用

  • 高度なエラーハンドリング
- エラーラッピング(%w) - errors.Is, errors.As - カスタムエラー型

  • リフレクション
- reflect パッケージ - 型情報の取得 - ジェネリックスとの比較

準備すべきこと:

  • Day 5 の課題をすべて完了する
  • チャネルパターンを自分の言葉で説明できるようにする
  • 実際のコードでチャネルを使ってみる

---

参考資料

公式ドキュメント

推奨書籍

  • "Concurrency in Go" by Katherine Cox-Buday
  • "Go in Action" by William Kennedy
  • "The Go Programming Language" by Donovan & Kernighan

オンラインリソース

---

おめでとうございます!Day 5 を完了しました。

チャネルはGo言語の最も強力な機能の一つです。今日学んだパターンは、実際の本番環境で日常的に使用されています。次のDay 6では、これらをさらに高度な形で活用する方法を学びます。