Day 4: 並行処理の基礎 - 解説

目次

従来の並行処理との違い

従来のスレッドベース:

スレッド1 ─┐
          ├─→ [共有メモリ] ←─┐
スレッド2 ─┘                 │
                             │
スレッド3 ────────────────────┘

問題点:
- ロックが複雑
- デッドロックのリスク
- 競合状態が発生しやすい

CSP/Go方式:

Goroutine1 ──→ [Channel] ──→ Goroutine2
                   ↑
                   │
              Goroutine3

利点:
- シンプルな通信モデル
- デッドロックが少ない
- 型安全な通信

Goでの実装例

package main

import (
    "fmt"
    "time"
)

// プロデューサー: データを生成してチャネルに送信
func producer(ch chan<- int, name string) {
    for i := 0; i < 5; i++ {
        fmt.Printf("%s: データ %d を生成\n", name, i)
        ch <- i // チャネルに送信(通信)
        time.Sleep(time.Millisecond * 100)
    }
    close(ch) // 送信完了を通知
}

// コンシューマー: チャネルからデータを受信して処理
func consumer(ch <-chan int, done chan<- bool, name string) {
    for value := range ch { // チャネルがクローズされるまでループ
        fmt.Printf("%s: データ %d を消費\n", name, value)
        time.Sleep(time.Millisecond * 200)
    }
    done <- true // 完了を通知
}

func main() {
    // チャネルを作成(通信路)
    ch := make(chan int)
    done := make(chan bool)

    // プロデューサーとコンシューマーを並行実行
    go producer(ch, "Producer")
    go consumer(ch, done, "Consumer")

    <-done // 完了を待機
    fmt.Println("全処理完了")
}

このコードは、共有メモリなしで、チャネルによる通信のみでデータを受け渡しています。

---

詳細なアルゴリズム分析

G-M-Pモデル: Goのスケジューラ

GoのランタイムはM:N スケジューリングモデルを採用しています。

コンポーネント

  • G (Goroutine): 実行される関数とそのスタック
- 初期スタックサイズ: 2KB - 最大スタックサイズ: 1GB(動的に拡張) - OSスレッドより遥かに軽量

  • M (Machine): OSスレッド
- 実際にCPU上で実行される - デフォルトではCPUコア数と同じ数のMが作成される

  • P (Processor): 論理プロセッサ
- Gのローカルキューを持つ - GOMAXPROCS環境変数で数を制御(デフォルト: CPUコア数)

スケジューリングの流れ

1. Goroutineが作成される(G)
   ↓
2. Pのローカルキューに追加
   ↓
3. MがPから実行可能なGを取得
   ↓
4. Gを実行
   ↓
5. Gがブロック(I/O待ちなど)したら
   - MはPから切り離される
   - 新しいMが作成されるか、アイドルMが使用される
   ↓
6. I/Oが完了したらGは再びキューに入る

Work Stealing

Pのローカルキューが空になったら、他のPのキューからWork Stealing(仕事の盗み)を行います。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func demonstrateWorkStealing() {
    // CPUコア数を取得
    numCPU := runtime.NumCPU()
    fmt.Printf("CPU数: %d\n", numCPU)

    // GOMAXPROCSを設定(Pの数)
    runtime.GOMAXPROCS(numCPU)

    var wg sync.WaitGroup

    // 100個のgoroutineを作成
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            // CPUバウンドな処理
            sum := 0
            for j := 0; j < 1000000; j++ {
                sum += j
            }

            // 現在実行中のPとMを確認
            fmt.Printf("Goroutine %d: sum=%d\n", id, sum)
        }(i)
    }

    // 実行中のgoroutine数を監視
    go func() {
        for {
            fmt.Printf("実行中のGoroutine数: %d\n", runtime.NumGoroutine())
            time.Sleep(time.Millisecond * 100)
        }
    }()

    wg.Wait()
}

Work Stealingの仕組み:

P0のキュー: [G1, G2, G3] ←── 実行中
P1のキュー: [G4]        ←── キューが少ない
P2のキュー: []           ←── 空!

Work Stealing発生:
P2がP0のキューから半分盗む
P0のキュー: [G1]
P1のキュー: [G4]
P2のキュー: [G2, G3]     ←── 盗んだ

時間計算量への影響

並行処理は理論上の最大スピードアップを提供しますが、実際の効果は様々な要因に依存します。

Amdahlの法則

スピードアップ = 1 / (S + P/N)

S: 逐次的部分の割合
P: 並列化可能部分の割合(S + P = 1)
N: プロセッサ数

: 90%が並列化可能(S=0.1, P=0.9)

N=2:  スピードアップ = 1 / (0.1 + 0.9/2)  = 1.82倍
N=4:  スピードアップ = 1 / (0.1 + 0.9/4)  = 3.08倍
N=8:  スピードアップ = 1 / (0.1 + 0.9/8)  = 4.71倍
N=16: スピードアップ = 1 / (0.1 + 0.9/16) = 6.40倍
N=∞:  スピードアップ = 1 / 0.1 = 10倍(理論上の上限)

逐次的部分(10%)がボトルネックとなり、無限にプロセッサを増やしても10倍以上にはなりません。

Gustafsonの法則

より楽観的なモデル。問題サイズを大きくすれば、並列化の効果が高まるという考え方。

スピードアップ = S + P×N

実践的な意味:

  • 小さなタスク: Amdahlの法則が支配的
  • 大きなタスク: Gustafsonの法則が適用可能

空間計算量への影響

Goroutineのメモリフットプリント

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func measureGoroutineMemory() {
    var m runtime.MemStats

    // 初期メモリ使用量
    runtime.ReadMemStats(&m)
    initialAlloc := m.Alloc

    var wg sync.WaitGroup
    numGoroutines := 100000

    // 10万個のgoroutineを作成
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // 何もせずに待機
            var x [10]byte // 10バイトのローカル変数
            _ = x
            wg.Wait()
        }()
    }

    // メモリ使用量を測定
    runtime.ReadMemStats(&m)
    currentAlloc := m.Alloc

    fmt.Printf("Goroutine数: %d\n", numGoroutines)
    fmt.Printf("追加メモリ使用量: %d MB\n", (currentAlloc-initialAlloc)/1024/1024)
    fmt.Printf("Goroutine1つあたり: %d bytes\n", (currentAlloc-initialAlloc)/uint64(numGoroutines))

    wg.Done() // 全goroutineを解放
}

典型的な結果:

Goroutine数: 100000
追加メモリ使用量: 195 MB
Goroutine1つあたり: 2048 bytes (2KB)

OSスレッドとの比較:

  • OSスレッド: 1MB〜2MB(約1000倍)
  • Goroutine: 2KB(初期)

---

設計原則と設計決定

CSPの思想: "Don't communicate by sharing memory; share memory by communicating"

この原則は、並行プログラミングのパラダイムシフトを表しています。

共有メモリ方式(従来)

// 悪い例:共有メモリで通信
type SharedData struct {
    mu    sync.Mutex
    value int
}

func (s *SharedData) Increment() {
    s.mu.Lock()         // ロック取得
    s.value++           // クリティカルセクション
    s.mu.Unlock()       // ロック解放
}

func (s *SharedData) Get() int {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.value
}

問題点:

  • ロックの取得・解放が煩雑
  • デッドロックのリスク
  • スケーラビリティの問題(ロック競合)

通信方式(CSP)

// 良い例:チャネルで通信
type ChannelCounter struct {
    requests chan int
    results  chan int
}

func NewChannelCounter() *ChannelCounter {
    c := &ChannelCounter{
        requests: make(chan int),
        results:  make(chan int),
    }

    go c.run() // バックグラウンドでカウンター管理
    return c
}

func (c *ChannelCounter) run() {
    counter := 0
    for cmd := range c.requests {
        switch cmd {
        case 1: // Increment
            counter++
        case 2: // Get
            c.results <- counter
        }
    }
}

func (c *ChannelCounter) Increment() {
    c.requests <- 1
}

func (c *ChannelCounter) Get() int {
    c.requests <- 2
    return <-c.results
}

利点:

  • ロック不要
  • 状態管理が1箇所に集約
  • デッドロックのリスクが低い

設計決定の例: Worker Poolパターン

決定事項1: ワーカー数の設定

// CPU集約的タスク
func cpuIntensiveWorkers() int {
    return runtime.NumCPU() // CPUコア数と同じ
}

// I/O集約的タスク
func ioIntensiveWorkers() int {
    return runtime.NumCPU() * 2 // CPUコア数の2〜4倍
}

// 混合タスク
func mixedWorkers() int {
    return runtime.NumCPU() + 2 // CPUコア数 + 余裕
}

根拠:

  • CPU集約的: コンテキストスイッチのオーバーヘッドを避ける
  • I/O集約的: I/O待機中に他のワーカーを実行
  • 混合: バランスを取る

決定事項2: チャネルのバッファサイズ

// バッファなし
ch1 := make(chan Task) // 同期的通信

// 小さなバッファ
ch2 := make(chan Task, 10) // 短期的なバースト対応

// 大きなバッファ
ch3 := make(chan Task, 1000) // 長期的なバッファリング

// ワーカー数の倍数
ch4 := make(chan Task, workerCount*2) // ワーカー数に比例

トレードオフ:

  • バッファなし: 最も単純だが、送信側がブロックされやすい
  • 小さなバッファ: バーストトラフィックに対応
  • 大きなバッファ: メモリ使用量が増加
  • ワーカー数の倍数: バランスが良い選択

決定事項3: Graceful Shutdownの実装

type WorkerPool struct {
    shutdownOnce sync.Once // 複数回のシャットダウンを防ぐ
}

func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
    var err error

    wp.shutdownOnce.Do(func() {
        // シャットダウン処理(1回のみ実行)
        close(wp.jobs) // 新規ジョブを受け付けない

        done := make(chan struct{})
        go func() {
            wp.wg.Wait()     // 進行中のジョブの完了を待つ
            close(done)
        }()

        select {
        case <-done:
            // 正常終了
            err = nil
        case <-time.After(timeout):
            // タイムアウト - 強制終了
            wp.cancel() // コンテキストキャンセル
            <-done      // それでも完了を待つ
            err = fmt.Errorf("shutdown timeout")
        }
    })

    return err
}

設計決定の理由:

  • sync.Once: 複数goroutineから同時にShutdownが呼ばれても安全
  • タイムアウト: 永遠に待たない(デッドロック回避)
  • 2段階シャットダウン: graceful → タイムアウト後に強制終了

---

Mental Model: 直感的理解

Goroutineの軽量性

従来のスレッドを「重いトラック」、goroutineを「自転車」に例えると:

トラック(OSスレッド):
- 初期コスト: 高い(1-2MB)
- 切り替えコスト: 高い(数マイクロ秒)
- 数の制限: 数千程度
- 用途: 重い荷物を運ぶ

自転車(Goroutine):
- 初期コスト: 低い(2KB)
- 切り替えコスト: 低い(数十ナノ秒)
- 数の制限: 数百万可能
- 用途: 小回りが効く

並行 vs 並列

多くの人が混同する概念ですが、本質的に異なります。

並行 (Concurrency)

定義: 複数のタスクを交互に実行することで、同時進行しているように見せる

: 1人のシェフが複数の料理を作る

時間軸:
0s  : パスタの湯を沸かす   ────┐
2s  : サラダを切る         ────┼────┐
4s  : パスタを確認         ←───┘    │
5s  : ステーキを焼く       ─────────┼────┐
7s  : サラダを盛り付ける   ←────────┘    │
9s  : パスタを茹でる       ←──────────────┘

コード例:

func concurrentCooking() {
    go boilPasta()     // 並行タスク1
    go cutSalad()      // 並行タスク2
    go cookSteak()     // 並行タスク3
    // 1つのCPUコアで交互に実行
}

並列 (Parallelism)

定義: 複数のタスクを同時に実行する

: 3人のシェフが同時に料理を作る

時間軸:
シェフ1: パスタを茹でる  ───────────────
シェフ2: サラダを切る    ───────────────
シェフ3: ステーキを焼く  ───────────────
         ↑ 同時に進行

コード例:

func parallelCooking() {
    runtime.GOMAXPROCS(3) // 3つのCPUコアを使用

    go boilPasta()        // CPUコア1
    go cutSalad()         // CPUコア2
    go cookSteak()        // CPUコア3
    // 3つのCPUコアで同時実行
}

視覚的な違い

並行(Concurrency):
CPU: [Task1] [Task2] [Task1] [Task3] [Task2] [Task1] ...
     ↑ 1つのCPUで高速に切り替え

並列(Parallelism):
CPU1: [Task1] [Task1] [Task1] [Task1] ...
CPU2: [Task2] [Task2] [Task2] [Task2] ...
CPU3: [Task3] [Task3] [Task3] [Task3] ...
      ↑ 複数のCPUで同時実行

データフローとしての並行処理

パイプラインを水の流れに例えると理解しやすくなります。

       ┌─────────┐       ┌─────────┐       ┌─────────┐
Data ──┤ Stage 1 ├──────→┤ Stage 2 ├──────→┤ Stage 3 ├──→ Result
       └─────────┘       └─────────┘       └─────────┘
         ↑ フィルタ         ↑ 変換           ↑ 集約

コード例:

func dataPipeline() {
    // ステージ1: データ生成
    generator := func() <-chan int {
        out := make(chan int)
        go func() {
            for i := 0; i < 100; i++ {
                out <- i
            }
            close(out)
        }()
        return out
    }

    // ステージ2: フィルタリング
    filter := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for num := range in {
                if num%2 == 0 { // 偶数のみ
                    out <- num
                }
            }
            close(out)
        }()
        return out
    }

    // ステージ3: 変換
    transform := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for num := range in {
                out <- num * num // 2乗
            }
            close(out)
        }()
        return out
    }

    // パイプライン構築
    data := generator()
    filtered := filter(data)
    transformed := transform(filtered)

    // 結果を消費
    for result := range transformed {
        fmt.Println(result)
    }
}

---

Further Exploration: 発展的トピック

1. golang.org/x/sync/errgroup

複数のgoroutineのエラーを集約する便利なパッケージ。

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

func demonstrateErrgroup() error {
    g, ctx := errgroup.WithContext(context.Background())

    // 複数のタスクを並行実行
    urls := []string{
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    }

    for _, url := range urls {
        url := url // ローカル変数にコピー
        g.Go(func() error {
            return fetch(ctx, url)
        })
    }

    // 全タスクの完了を待ち、最初のエラーを返す
    if err := g.Wait(); err != nil {
        return fmt.Errorf("fetch failed: %w", err)
    }

    return nil
}

func fetch(ctx context.Context, url string) error {
    // フェッチ処理(シミュレーション)
    select {
    case <-time.After(time.Second):
        fmt.Printf("Fetched: %s\n", url)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

利点:

  • エラーハンドリングが簡潔
  • 最初のエラーで全タスクをキャンセル
  • コンテキスト伝播が自動

2. golang.org/x/sync/semaphore

重み付きセマフォの実装。

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func demonstrateSemaphore() {
    // 最大3並行
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()

    for i := 0; i < 10; i++ {
        // セマフォを取得(重み1)
        if err := sem.Acquire(ctx, 1); err != nil {
            fmt.Printf("セマフォ取得失敗: %v\n", err)
            break
        }

        go func(id int) {
            defer sem.Release(1) // セマフォを解放

            fmt.Printf("タスク %d 開始\n", id)
            time.Sleep(time.Second)
            fmt.Printf("タスク %d 完了\n", id)
        }(i)
    }

    // 全タスクの完了を待つ
    sem.Acquire(ctx, 3) // 全セマフォを取得(全タスク完了)
}

用途:

  • リソース使用量の制限
  • レート制限
  • 接続プールの管理

3. golang.org/x/sync/singleflight

同じキーに対する重複リクエストを抑制。

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "sync"
    "time"
)

var group singleflight.Group

func expensiveOperation(key string) (interface{}, error) {
    // 重い処理をシミュレート
    time.Sleep(time.Second)
    return fmt.Sprintf("result for %s", key), nil
}

func demonstrateSingleflight() {
    var wg sync.WaitGroup

    // 10個の同時リクエスト(同じキー)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            // 同じキーに対するリクエストは1回のみ実行される
            result, err, shared := group.Do("mykey", func() (interface{}, error) {
                return expensiveOperation("mykey")
            })

            fmt.Printf("Goroutine %d: result=%v, err=%v, shared=%v\n",
                id, result, err, shared)
        }(i)
    }

    wg.Wait()
}

出力例:

Goroutine 3: result=result for mykey, err=<nil>, shared=true
Goroutine 5: result=result for mykey, err=<nil>, shared=true
Goroutine 1: result=result for mykey, err=<nil>, shared=true
...

全てのgoroutineが1回の実行結果を共有します(shared=true)。

用途:

  • キャッシュの stampeding herd 問題の解決
  • API呼び出しの重複防止
  • データベースクエリの最適化

4. 分散システムでの並行処理

分散トレーシング

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func distributedTask(ctx context.Context) {
    tracer := otel.Tracer("my-service")

    // スパンを作成
    ctx, span := tracer.Start(ctx, "distributedTask")
    defer span.End()

    // 子タスクを並行実行
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            childTask(ctx, id) // コンテキストを伝播
        }(i)
    }

    wg.Wait()
}

func childTask(ctx context.Context, id int) {
    tracer := otel.Tracer("my-service")

    // 親スパンから子スパンを作成
    _, span := tracer.Start(ctx, fmt.Sprintf("childTask-%d", id))
    defer span.End()

    // 処理
    time.Sleep(time.Millisecond * 100)
}

利点:

  • 分散システム全体のリクエストフローを可視化
  • パフォーマンスボトルネックの特定
  • エラーの追跡

5. ランタイムの内部動作

スケジューラの調整

package main

import (
    "fmt"
    "runtime"
    "runtime/debug"
)

func tuneRuntime() {
    // GCの頻度を調整
    debug.SetGCPercent(200) // デフォルトは100

    // 最大OSスレッド数を制限
    debug.SetMaxThreads(1000) // デフォルトは10000

    // メモリ上限を設定
    debug.SetMemoryLimit(1024 * 1024 * 1024) // 1GB

    // 統計情報を取得
    var m runtime.MemStats
    runtime.ReadMemStats(&m)

    fmt.Printf("Alloc = %v MB\n", m.Alloc/1024/1024)
    fmt.Printf("TotalAlloc = %v MB\n", m.TotalAlloc/1024/1024)
    fmt.Printf("Sys = %v MB\n", m.Sys/1024/1024)
    fmt.Printf("NumGC = %v\n", m.NumGC)
    fmt.Printf("NumGoroutine = %v\n", runtime.NumGoroutine())
}

スケジューラトレース

# スケジューラの動作を詳細にログ出力
GODEBUG=schedtrace=1000 go run main.go

# 出力例:
# SCHED 0ms: gomaxprocs=8 idleprocs=0 threads=10 spinningthreads=0 idlethreads=3 runqueue=0 [0 0 0 0 0 0 0 0]

読み方:

  • gomaxprocs=8: Pの数(論理プロセッサ)
  • idleprocs=0: アイドル状態のP
  • threads=10: OSスレッド数
  • runqueue=0: グローバルランキューのG数
  • [0 0 0 0 0 0 0 0]: 各Pのローカルキューのサイズ
  • ---

    ビジュアル図解

    G-M-Pモデルの図解

    ┌─────────────────────────────────────────────────────┐
    │                    Go Runtime                        │
    │                                                      │
    │  ┌──────────┐  ┌──────────┐  ┌──────────┐           │
    │  │    P0    │  │    P1    │  │    P2    │  ...     │
    │  │          │  │          │  │          │           │
    │  │ [G1, G2] │  │ [G3, G4] │  │ [G5, G6] │  Local   │
    │  │  Queue   │  │  Queue   │  │  Queue   │  Run     │
    │  └────┬─────┘  └────┬─────┘  └────┬─────┘  Queues  │
    │       │             │             │                 │
    │       │             │             │                 │
    │  ┌────▼─────┐  ┌───▼──────┐  ┌───▼──────┐         │
    │  │    M0    │  │    M1    │  │    M2    │  ...     │
    │  │ OS Thread│  │ OS Thread│  │ OS Thread│          │
    │  └────┬─────┘  └────┬─────┘  └────┬─────┘          │
    └───────┼─────────────┼─────────────┼─────────────────┘
            │             │             │
    ┌───────▼─────────────▼─────────────▼─────────────────┐
    │                 Operating System                     │
    │              (Physical CPU Cores)                    │
    └─────────────────────────────────────────────────────┘
    
    Work Stealing:
    P0 [G1, G2, G3, G4] ────┐
                            │ 盗む
    P1 []              ←────┘ [G3, G4]
    

    データフローの可視化

    パイプラインパターン:
    
    Input → [Generate] → ch1 → [Filter] → ch2 → [Transform] → ch3 → Output
             ↑               ↑              ↑                ↑
          Goroutine       Goroutine      Goroutine       Goroutine
             1               2              3               4
    
    
    ファンアウト・ファンイン:
    
                        ┌─→ [Worker 1] ─┐
                        │                 │
    Input → [Dispatch] ─┼─→ [Worker 2] ─┼→ [Merge] → Output
                        │                 │
                        └─→ [Worker 3] ─┘
    

    チャネルの動作

    バッファなしチャネル:
    
    Sender                Receiver
      │                      │
      ├──→ データ送信 ───────┤  ← ブロック(受信待ち)
      │    (ブロック)        │
      │                      ├──→ データ受信
      ├──→ 送信完了 ←────────┤
      │                      │
    
    
    バッファ付きチャネル (cap=2):
    
    Sender                Buffer [_, _]        Receiver
      │                                            │
      ├──→ データ1送信 ──→ [1, _]                 │
      │    (ブロックしない)                         │
      │                                            │
      ├──→ データ2送信 ──→ [1, 2]                 │
      │    (ブロックしない)                         │
      │                                            │
      ├──→ データ3送信 ──→ [1, 2]                 │
      │    (ブロック)       ↑ 満杯                 │
      │                                            │
      │                     [1, 2] ←──────────────┤ 受信
      │                     [2, _]                 │
      ├──→ 送信完了 ──────→ [2, 3]                 │
    

    ---

    Self-Check Questions

    理解度を確認するための質問です。

    基礎レベル

  • Q: goroutineとOSスレッドの違いは何ですか?
  • A: goroutineはOSスレッドより軽量(2KB vs 1-2MB)で、Goのランタイムによって管理されます。OSスレッドはカーネルによって管理され、コンテキストスイッチのコストが高いです。

  • Q: sync.WaitGroupAddDoneWaitはいつ使いますか?

A: - Add(n): goroutine起動前にカウンタを増やす - Done(): goroutine終了時にカウンタを減らす(通常はdeferで) - Wait(): 全goroutineの完了を待機

  • Q: チャネルをクローズするのは送信側と受信側のどちらですか?
  • A: 送信側がクローズします。受信側がクローズすると、他の受信者がパニックする可能性があります。

    中級レベル

  • Q: sync.Mutexsync.RWMutexの違いは何ですか?どちらを選ぶべきですか?

A: - Mutex: 読み書き両方をロック - RWMutex: 読み取りは複数同時可能、書き込みは排他的 - 選択基準: 読み取りが多い場合はRWMutex、書き込みが多い場合はMutex(オーバーヘッドが少ない)

  • Q: データ競合を検出する方法は?
  • A: go run -raceまたはgo test -raceフラグを使用します。これはGoのrace detectorを有効にします。

  • Q: バッファ付きチャネルとバッファなしチャネルの違いは?

A: - バッファなし: 同期的通信(送受信が同時に発生) - バッファ付き: 非同期的通信(バッファが満杯になるまで送信側はブロックされない)

上級レベル

  • Q: Goのスケジューラの Work Stealing とは何ですか?
  • A: あるP(論理プロセッサ)のローカルキューが空になったとき、他のPのキューから実行可能なgoroutineを「盗む」メカニズム。負荷分散を実現します。

  • Q: Amdahlの法則が示す並行処理の限界とは?
  • A: プログラムの逐次的部分が並列化のボトルネックとなり、プロセッサ数を無限に増やしても、スピードアップは逐次的部分の逆数(1/S)が上限となる。

  • Q: context.Contextの主な用途は?

A: - キャンセルシグナルの伝播 - タイムアウトの設定 - リクエストスコープの値の伝達 - goroutine間の協調的な終了

  • Q: どのような場合にgoroutineリークが発生しますか?

A: - チャネルからの受信を永遠に待ち続ける - selectdefaultやタイムアウトがない - context.Done()を監視していない - 無限ループから抜け出せない

---

Production Considerations

本番環境で並行処理を使用する際の重要な考慮事項です。

1. リソース制限

Goroutineの数を制限する

type ResourceLimiter struct {
    sem chan struct{} // セマフォ
}

func NewResourceLimiter(max int) *ResourceLimiter {
    return &ResourceLimiter{
        sem: make(chan struct{}, max),
    }
}

func (rl *ResourceLimiter) Acquire() {
    rl.sem <- struct{}{}
}

func (rl *ResourceLimiter) Release() {
    <-rl.sem
}

// 使用例
func processWithLimit(items []Item) {
    limiter := NewResourceLimiter(100) // 最大100並行
    var wg sync.WaitGroup

    for _, item := range items {
        limiter.Acquire() // セマフォ取得
        wg.Add(1)

        go func(i Item) {
            defer wg.Done()
            defer limiter.Release() // セマフォ解放

            process(i)
        }(item)
    }

    wg.Wait()
}

理由: 無制限にgoroutineを作成すると、メモリ枯渇やCPU競合が発生します。

2. タイムアウトの設定

func withTimeout(f func() error, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    errCh := make(chan error, 1)

    go func() {
        errCh <- f()
    }()

    select {
    case err := <-errCh:
        return err
    case <-ctx.Done():
        return fmt.Errorf("operation timeout: %w", ctx.Err())
    }
}

理由: タイムアウトがないと、障害時に永遠に待ち続ける可能性があります。

3. パニックからの回復

func safeGoroutine(f func()) {
    go func() {
        defer func() {
            if r := recover(); r != nil {
                // スタックトレースをログに記録
                buf := make([]byte, 1<<16)
                stackSize := runtime.Stack(buf, false)
                log.Printf("PANIC: %v\nStack trace:\n%s", r, buf[:stackSize])
            }
        }()

        f() // 実際の処理
    }()
}

理由: 1つのgoroutineのパニックがシステム全体をダウンさせる可能性があります。

4. メトリクスと監視

type Metrics struct {
    activeGoroutines  int64
    totalProcessed    int64
    totalErrors       int64
    processingTimeMs  int64
}

var globalMetrics Metrics

func recordMetrics() {
    atomic.AddInt64(&globalMetrics.activeGoroutines, 1)
    defer atomic.AddInt64(&globalMetrics.activeGoroutines, -1)

    start := time.Now()
    defer func() {
        duration := time.Since(start).Milliseconds()
        atomic.AddInt64(&globalMetrics.processingTimeMs, duration)
    }()

    // 処理
    if err := doSomething(); err != nil {
        atomic.AddInt64(&globalMetrics.totalErrors, 1)
    } else {
        atomic.AddInt64(&globalMetrics.totalProcessed, 1)
    }
}

// Prometheusなどに定期的にエクスポート
func exportMetrics() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        active := atomic.LoadInt64(&globalMetrics.activeGoroutines)
        processed := atomic.LoadInt64(&globalMetrics.totalProcessed)
        errors := atomic.LoadInt64(&globalMetrics.totalErrors)
        avgTime := atomic.LoadInt64(&globalMetrics.processingTimeMs)

        log.Printf("Metrics: active=%d, processed=%d, errors=%d, avg_time=%dms",
            active, processed, errors, avgTime)
    }
}

理由: 本番環境では、システムの健全性を監視する必要があります。

5. Graceful Shutdownのベストプラクティス

type Server struct {
    shutdownCh chan struct{}
    wg         sync.WaitGroup
}

func (s *Server) Start() {
    // ワーカーを起動
    for i := 0; i < 10; i++ {
        s.wg.Add(1)
        go s.worker(i)
    }
}

func (s *Server) worker(id int) {
    defer s.wg.Done()

    for {
        select {
        case <-s.shutdownCh:
            log.Printf("Worker %d: シャットダウンシグナル受信\n", id)
            return
        default:
            // 通常の処理
            s.doWork(id)
        }
    }
}

func (s *Server) Shutdown(timeout time.Duration) error {
    log.Println("シャットダウン開始...")

    // 1. 新規リクエストの受付を停止
    close(s.shutdownCh)

    // 2. 進行中の処理の完了を待つ(タイムアウト付き)
    done := make(chan struct{})
    go func() {
        s.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println("全ワーカーが正常に終了しました")
        return nil
    case <-time.After(timeout):
        log.Println("シャットダウンタイムアウト")
        return fmt.Errorf("shutdown timeout after %v", timeout)
    }
}

// OSシグナルとの統合
func main() {
    server := &Server{
        shutdownCh: make(chan struct{}),
    }
    server.Start()

    // OSシグナルを待機
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

    <-sigCh
    log.Println("シャットダウンシグナル受信")

    if err := server.Shutdown(30 * time.Second); err != nil {
        log.Printf("シャットダウンエラー: %v\n", err)
        os.Exit(1)
    }

    log.Println("正常終了")
}

6. デバッグとプロファイリング

import _ "net/http/pprof"

func enableProfiling() {
    go func() {
        // pprofエンドポイントを公開
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
}

// 使用方法:
// go tool pprof http://localhost:6060/debug/pprof/goroutine
// go tool pprof http://localhost:6060/debug/pprof/heap
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30

重要なプロファイル:

  • goroutine: goroutineリークの検出
  • heap: メモリリークの検出
  • cpu: CPUボトルネックの特定
  • mutex: ロック競合の分析
  • ---

    まとめ

    並行処理は強力ですが、適切に使用する必要があります:

    重要な原則

  • CSPの思想を理解する: 共有メモリではなく通信で協調
  • G-M-Pモデルを知る: Goのスケジューラの仕組みを理解
  • 並行と並列の違い: 概念を正しく理解する
  • 適切な同期プリミティブ: Mutex、RWMutex、atomic、channelを使い分ける
  • リソース管理: goroutineリークを防ぎ、適切に制限する
  • エラーハンドリング: パニック回復、タイムアウト、キャンセル
  • 監視と最適化: メトリクス収集、プロファイリング
  • 次のステップ

  • Day 5: チャネルを使った高度な並行処理パターン
  • 実践: 実際のプロジェクトで並行処理を適用
  • 発展学習: 分散システム、マイクロサービスでの並行処理

並行処理をマスターすることで、高性能で拡張性の高いGoアプリケーションを構築できるようになります。