第13章: 並行処理基礎

学習目標

この章を終えると、以下ができるようになります:

  • 並行性(concurrency)と並列性(parallelism)の違いを説明できる
  • goroutineを使って軽量な並行処理を実装できる
  • sync.WaitGroupでgoroutineの完了を待機できる
  • レースコンディションを理解し、回避できる
  • GoのGMPモデルとスケジューラーの内部動作を理解できる

並行性と並列性

概念の違い

並行性(Concurrency)

  • 複数のタスクを「交互に」実行する
  • 1つのCPUコアでも実現可能
  • 論理的な構造

並列性(Parallelism)

  • 複数のタスクを「同時に」実行する
  • 複数のCPUコアが必要
  • 物理的な実行

並行性(1コア):
Task A: ----    ----    ----
Task B:     ----    ----    ----
時間: ─────────────────────────→

並列性(2コア):
Core 1 - Task A: ────────────────
Core 2 - Task B: ────────────────
時間: ─────────────────────────→

Rob Pikeの名言

> "Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." > (並行性は多くのことを扱うこと。並列性は多くのことを同時に行うこと。)

goroutineの基礎

goroutineとは

goroutineは、Goランタイムが管理する軽量なスレッドです。

特徴

  • 初期スタックサイズ: 約2KB(OSスレッドは約1MB)
  • 数千〜数万のgoroutineを同時実行可能
  • goキーワードで起動

最初のgoroutine

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello from goroutine!")
}

func main() {
    // 通常の関数呼び出し
    sayHello()

    // goroutineとして実行
    go sayHello()

    // goroutineが実行されるまで待機
    time.Sleep(time.Second)

    fmt.Println("Main function ends")
}

出力例:

Hello from goroutine!
Hello from goroutine!
Main function ends

GMPモデル:Goスケジューラーの内部構造

🔑 GMPモデルは、Goランタイムの心臓部です。 これを理解することで、goroutineがなぜ軽量で効率的なのかがわかります。

GMPの構成要素

G (Goroutine):
┌─────────────┐
│  Stack      │  約2KB(初期サイズ)
│  PC         │  プログラムカウンタ
│  Context    │  実行コンテキスト
└─────────────┘

M (Machine/OS Thread):
┌─────────────┐
│  OS Thread  │  実際のOSスレッド
│  G*         │  現在実行中のG
│  P*         │  割り当てられたP
└─────────────┘

P (Processor):
┌─────────────┐
│  Run Queue  │  実行待ちGのキュー
│  mcache     │  メモリキャッシュ
│  状態       │  実行可能/アイドル等
└─────────────┘

GMPの関係図

                 Global Run Queue
                ┌─────────────────┐
                │ G  G  G  G  G   │
                └─────────────────┘
                        ↓
        ┌───────────────┼───────────────┐
        ↓               ↓               ↓
    ┌───────┐       ┌───────┐       ┌───────┐
    │   P0  │       │   P1  │       │   P2  │
    │┌─────┐│       │┌─────┐│       │┌─────┐│
    ││G G G││       ││G G G││       ││G G G││
    │└─────┘│       │└─────┘│       │└─────┘│
    └───┬───┘       └───┬───┘       └───┬───┘
        │               │               │
    ┌───┴───┐       ┌───┴───┐       ┌───┴───┐
    │   M0  │       │   M1  │       │   M2  │
    │(OS スレ)│       │(OS スレ)│       │(OS スレ)│
    └───────┘       └───────┘       └───────┘
        │               │               │
        └───────────────┼───────────────┘
                        ↓
                  OS Kernel
                 (CPU Cores)

🔑 G(Goroutine)の内部構造

Goroutineはruntime.g構造体として実装されています:

// runtime/runtime2.go (簡略版)
type g struct {
    stack       stack    // スタック記述子
    stackguard0 uintptr  // スタックガード(オーバーフロー検出)

    m           *m       // 現在のm
    sched       gobuf    // スケジューリングコンテキスト

    goid        int64    // goroutine ID
    gopc        uintptr  // goroutine作成時のPC

    // その他多数のフィールド...
}

type gobuf struct {
    sp   uintptr  // スタックポインタ
    pc   uintptr  // プログラムカウンタ
    g    *g       // goroutineへの参照
    ret  uintptr  // 戻り値
}

スタックの動的成長

初期状態(2KB):
┌────────────┐ ← SP (Stack Pointer)
│            │
│  空き      │
│            │
├────────────┤
│  ローカル  │
│  変数      │
├────────────┤
│  戻りアド  │
│  レス      │
└────────────┘

スタック成長後(4KB):
┌────────────┐
│            │
│  新しい    │
│  フレーム  │
├────────────┤ ← 古いSP
│            │
│  既存の    │
│  データ    │
│            │
└────────────┘

💡 スタックが不足すると、ランタイムが自動的に新しい領域を割り当て、データをコピーします。

🔑 M(Machine)の内部構造

MはOSスレッドのラッパーです:

// runtime/runtime2.go (簡略版)
type m struct {
    g0      *g      // スケジューリング用goroutine
    curg    *g      // 現在実行中のgoroutine
    p       *p      // 関連付けられたP

    spinning bool   // ワークスティール中か
    blocked  bool   // システムコールでブロック中か

    // その他多数のフィールド...
}

g0の役割

通常のgoroutine実行:
M → curg (ユーザーgoroutine)

スケジューリング時:
M → g0 (スケジューリング用goroutine)
    ↓
    次のgoroutineを選択
    ↓
M → curg (次のgoroutine)

🔑 P(Processor)の内部構造

Pは実行コンテキストとリソースを管理します:

// runtime/runtime2.go (簡略版)
type p struct {
    id          int32   // P ID
    status      uint32  // 状態
    m           *m      // 関連付けられたM

    runqhead    uint32  // ローカルキューのヘッド
    runqtail    uint32  // ローカルキューのテール
    runq        [256]guintptr  // ローカル実行キュー

    runnext     guintptr  // 次に実行するG

    // メモリキャッシュ
    mcache      *mcache

    // その他多数のフィールド...
}

Pの数

// 環境変数で設定可能
GOMAXPROCS=4 go run main.go

// コード内で設定
runtime.GOMAXPROCS(4)

// デフォルト: CPUコア数

goroutineスケジューラーの動作

スケジューリングの基本フロー

1. Goroutine作成
   ┌─────────┐
   │ go f()  │
   └────┬────┘
        ↓
   ┌────────────────┐
   │ 新しいG作成    │
   └────┬───────────┘
        ↓
   ┌────────────────┐
   │ Pのrunqに追加  │
   └────┬───────────┘
        ↓
   ┌────────────────┐
   │ 実行可能状態   │
   └────────────────┘

2. Goroutine実行
   ┌────────────────┐
   │ MがPから取得   │
   └────┬───────────┘
        ↓
   ┌────────────────┐
   │ Gを実行        │
   └────┬───────────┘
        ↓
   ┌────────────────┐
   │ 完了/ブロック  │
   └────┬───────────┘
        ↓
   ┌────────────────┐
   │ 次のGを取得    │
   └────────────────┘

🔑 Work Stealing(ワークスティール)

他のPが忙しい時、アイドルなPが仕事を「盗む」仕組みです。

P0: [G1 G2 G3 G4 G5] (多忙)
P1: []               (アイドル)
P2: [G6 G7]          (普通)

↓ Work Stealing

P0: [G1 G2]          (半分残す)
P1: [G3 G4 G5]       (P0から盗む)
P2: [G6 G7]          (変化なし)

ワークスティールのアルゴリズム

// 疑似コード
func findRunnable() *g {
    // 1. ローカルキューをチェック
    if gp := runqget(p); gp != nil {
        return gp
    }

    // 2. グローバルキューをチェック
    if gp := globrunqget(p); gp != nil {
        return gp
    }

    // 3. 他のPから盗む(Work Stealing)
    for i := 0; i < 4; i++ {  // ランダムに4回試行
        p2 := allp[fastrand() % len(allp)]
        if gp := runqsteal(p, p2); gp != nil {
            return gp
        }
    }

    // 4. ネットワークポーラーをチェック
    if gp := netpoll(); gp != nil {
        return gp
    }

    return nil  // 見つからず
}

スケジューリングのタイミング

Goroutineは以下のタイミングで切り替わります:

1. システムコール
   ┌──────────────┐
   │ G1実行中     │
   └──────┬───────┘
          ↓
   ┌──────────────┐
   │ syscall()    │ ← ブロッキング
   └──────┬───────┘
          ↓
   ┌──────────────┐
   │ P/Mを分離    │ ← PはスピンまたはIdle
   └──────┬───────┘
          ↓
   ┌──────────────┐
   │ M継続        │ ← 新しいMまたは既存M
   │ G1待機       │
   └──────┬───────┘
          ↓
   ┌──────────────┐
   │ syscall完了  │
   └──────┬───────┘
          ↓
   ┌──────────────┐
   │ G1再開       │
   └──────────────┘

2. チャネル操作
   ch <- v  (送信ブロック)
   <-ch     (受信ブロック)

3. time.Sleep
   time.Sleep(time.Second)  ← タイマー待機

4. runtime.Gosched()
   runtime.Gosched()  ← 明示的な譲歩

5. 長時間実行(プリエンプション)
   約10msごとに強制スケジューリング

🔑 プリエンプション(Preemption)

Go 1.14以降、非協調的プリエンプションが導入されました。

従来(協調的プリエンプション):
┌──────────────────┐
│ for {            │
│   // 無限ループ  │  ← 他のgoroutineに譲らない
│ }                │     問題あり!
└──────────────────┘

Go 1.14+(非協調的プリエンプション):
┌──────────────────┐
│ for {            │
│   // 無限ループ  │  ← シグナルで強制中断
│ }                │     ✓ 他のgoroutineも実行可能
└──────────────────┘

シグナルベースのプリエンプション

1. スケジューラーがG1を長時間実行と判断
   ↓
2. SIGURG シグナルをスレッドに送信
   ↓
3. シグナルハンドラーでG1の状態を保存
   ↓
4. スケジューラーに制御を戻す
   ↓
5. 別のGを実行

sync.WaitGroup

WaitGroupとは

time.Sleep()は不適切な待機方法です。sync.WaitGroupを使うことで、goroutineの完了を正確に待機できます。

基本メソッド

  • Add(n): カウンターをn増加
  • Done(): カウンターを1減少
  • Wait(): カウンターが0になるまで待機

🔑 WaitGroupの内部実装

// sync/waitgroup.go (簡略版)
type WaitGroup struct {
    state1 [3]uint32  // カウンター、ウェイター数、セマフォ
}

// 64ビットアラインメントされたカウンター
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

メモリレイアウト

WaitGroup内部状態(64ビットマシン):
┌──────────────────────────────────┐
│      Counter (32 bit)            │  待機中のgoroutine数
├──────────────────────────────────┤
│      Waiter (32 bit)             │  Wait()呼び出し数
├──────────────────────────────────┤
│      Semaphore (32 bit)          │  セマフォ
└──────────────────────────────────┘

WaitGroupの動作フロー

初期状態:
Counter: 0, Waiter: 0

Add(3):
Counter: 3, Waiter: 0
        ↓
goroutine1起動 → Done() → Counter: 2
goroutine2起動 → Done() → Counter: 1
goroutine3起動 → Done() → Counter: 0
                              ↓
                         Waiterを起床

Wait()呼び出し:
Counter: 0か? → Yes: すぐ返る
              → No:  Waiter++, セマフォ待機

WaitGroupの使用例(詳細)

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 関数終了時にDone()を呼ぶ

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)           // カウンターを増加
        go worker(i, &wg)   // WaitGroupのポインタを渡す
    }

    wg.Wait() // すべてのgoroutineが完了するまで待機
    fmt.Println("All workers done")
}

⚠️ WaitGroupのよくある間違い

// 悪い例1: Addをgoroutine内で呼ぶ
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)  // ❌ レースコンディション!
        defer wg.Done()
        // ...
    }()
}

// 良い例1: Addをgoroutine起動前に呼ぶ
for i := 0; i < 10; i++ {
    wg.Add(1)  // ✓ main goroutineで呼ぶ
    go func() {
        defer wg.Done()
        // ...
    }()
}

// 悪い例2: Doneを忘れる
wg.Add(1)
go func() {
    // wg.Done()を呼び忘れ ❌
    // ...
}()
wg.Wait()  // デッドロック!

// 良い例2: deferでDoneを確実に呼ぶ
wg.Add(1)
go func() {
    defer wg.Done()  // ✓ panicしても必ず呼ばれる
    // ...
}()

レースコンディション

レースコンディションとは

複数のgoroutineが同じメモリ領域に同時アクセスし、少なくとも1つが書き込みを行う状態です。

問題のあるコード

package main

import (
    "fmt"
    "sync"
)

func main() {
    var counter int
    var wg sync.WaitGroup

    // 1000個のgoroutineがcounterを増加
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // レースコンディション!
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", counter) // 期待: 1000, 実際: 900前後(不定)
}

🔑 なぜレースコンディションが起きるのか

counter++は1命令に見えますが、実際は3ステップです:

アセンブリレベル(x86-64):
1. LOAD:  MOV RAX, [counter]   ; メモリからレジスタへ
2. INC:   INC RAX               ; レジスタをインクリメント
3. STORE: MOV [counter], RAX   ; レジスタからメモリへ

Goroutine1とGoroutine2が同時実行:
時刻  G1                G2
1     LOAD counter (0)
2                       LOAD counter (0)
3     INC (1)
4                       INC (1)
5     STORE (1)
6                       STORE (1)

結果: counter = 1(期待値は2)

レースディテクターの使用

Goには競合検出器が組み込まれています。

go run -race main.go

出力例:

==================
WARNING: DATA RACE
Write at 0x00c000014098 by goroutine 7:
  main.main.func1()
      /path/to/main.go:15 +0x3e

Previous write at 0x00c000014098 by goroutine 6:
  main.main.func1()
      /path/to/main.go:15 +0x3e

Goroutine 7 (running) created at:
  main.main()
      /path/to/main.go:13 +0x8e

Goroutine 6 (finished) created at:
  main.main()
      /path/to/main.go:13 +0x8e
==================
Counter: 987
Found 1 data race(s)
exit status 66

💡 レースディテクターは、実行中のメモリアクセスを監視し、競合を検出します。ただし、実行速度は約10倍遅くなります。

並行処理の同期

sync.Mutexによる保護

Mutex(mutual exclusion)は、共有リソースへのアクセスを排他制御します。

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := SafeCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", counter.Value()) // 確実に1000
}

🔑 Mutexの内部実装

// sync/mutex.go (簡略版)
type Mutex struct {
    state int32   // ロック状態とウェイター数
    sema  uint32  // セマフォ
}

const (
    mutexLocked = 1 << iota  // 1: ロックされている
    mutexWoken               // 2: 起床フラグ
    mutexStarving            // 4: スターベーションモード
    mutexWaiterShift = iota  // 3: ウェイター数のシフト量
)

Mutexの状態遷移

初期状態:
state: 0 (アンロック)

Lock()呼び出し:
state: 1 (ロック)
    ↓
他のgoroutineがLock()呼び出し
    ↓
state: 1 + (1 << 3) = 9 (ロック + 1ウェイター)
    ↓
Unlock()呼び出し
    ↓
ウェイターを起床
state: 0 (アンロック)

ノーマルモードとスターベーションモード

ノーマルモード:
┌─────────┐
│ Unlock  │
└────┬────┘
     ↓
  起床したgoroutineと新しいgoroutineが競争
  (新しいgoroutineが有利: すでにCPU上にいる)

スターベーションモード(1ms以上待機したgoroutine存在):
┌─────────┐
│ Unlock  │
└────┬────┘
     ↓
  起床したgoroutineに直接渡す
  (公平性を保証)

sync.RWMutex(読み書きロック)

読み込みは並行可能、書き込みは排他的にしたい場合に使います。

package main

import (
    "fmt"
    "sync"
    "time"
)

type CachedData struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewCachedData() *CachedData {
    return &CachedData{
        data: make(map[string]string),
    }
}

func (c *CachedData) Get(key string) (string, bool) {
    c.mu.RLock() // 読み取りロック(複数goroutineが同時取得可能)
    defer c.mu.RUnlock()
    value, ok := c.data[key]
    return value, ok
}

func (c *CachedData) Set(key, value string) {
    c.mu.Lock() // 書き込みロック(排他的)
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    cache := NewCachedData()
    var wg sync.WaitGroup

    // 書き込みgoroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 5; i++ {
            cache.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 読み込みgoroutine(複数)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                if value, ok := cache.Get("key1"); ok {
                    fmt.Printf("Reader %d: %s\n", id, value)
                }
                time.Sleep(50 * time.Millisecond)
            }
        }(i)
    }

    wg.Wait()
}

🔑 RWMutexの内部実装

// sync/rwmutex.go (簡略版)
type RWMutex struct {
    w           Mutex  // writer用のmutex
    writerSem   uint32 // writer用セマフォ
    readerSem   uint32 // reader用セマフォ
    readerCount int32  // reader数(負の場合はwriter待機中)
    readerWait  int32  // writerが待つreader数
}

RLock/RUnlockの動作

RLock():
1. readerCountをアトミックにインクリメント
2. 負の値(writer待機中)なら、readerSemで待機

RUnlock():
1. readerCountをアトミックにデクリメント
2. 最後のreaderなら、待機中のwriterを起床

Lock/Unlockの動作

Lock():
1. w.Lock()で他のwriterをブロック
2. readerCountから大きな数を引く(負にする)
3. 既存のreaderが完了するまで待機

Unlock():
1. readerCountを復元(正にする)
2. 待機中のreaderをすべて起床
3. w.Unlock()

実践例:並行Webスクレイパー

package main

import (
    "fmt"
    "sync"
    "time"
)

type Result struct {
    URL  string
    Size int
}

// ページのダウンロードをシミュレート
func fetchURL(url string) Result {
    // 実際のHTTPリクエストの代わりにスリープ
    time.Sleep(time.Duration(100+len(url)*10) * time.Millisecond)

    return Result{
        URL:  url,
        Size: len(url) * 100, // 仮のサイズ
    }
}

func worker(id int, urls <-chan string, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()

    for url := range urls {
        fmt.Printf("Worker %d: fetching %s\n", id, url)
        result := fetchURL(url)
        results <- result
    }
}

func main() {
    urls := []string{
        "https://example.com",
        "https://golang.org",
        "https://github.com",
        "https://stackoverflow.com",
        "https://reddit.com",
        "https://twitter.com",
        "https://youtube.com",
        "https://wikipedia.org",
    }

    // チャネルの作成
    urlChannel := make(chan string, len(urls))
    resultChannel := make(chan Result, len(urls))

    var wg sync.WaitGroup
    numWorkers := 3

    // ワーカーの起動
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, urlChannel, resultChannel, &wg)
    }

    // URLをチャネルに送信
    for _, url := range urls {
        urlChannel <- url
    }
    close(urlChannel)

    // 結果の収集用goroutine
    go func() {
        wg.Wait()
        close(resultChannel)
    }()

    // 結果の表示
    totalSize := 0
    for result := range resultChannel {
        fmt.Printf("✓ %s (size: %d bytes)\n", result.URL, result.Size)
        totalSize += result.Size
    }

    fmt.Printf("\n合計サイズ: %d bytes\n", totalSize)
}

ベストプラクティス

1. goroutineのリーク防止

// 悪い例: goroutineがリークする
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch // チャネルから読み取りを待つが、永遠に来ない
        fmt.Println(val)
    }()
    // chに何も送信せずに関数終了
}

// 良い例: コンテキストでキャンセル可能
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            return
        }
    }()
}

2. エラー処理

type Result struct {
    Value int
    Err   error
}

func worker(id int, wg *sync.WaitGroup, results chan<- Result) {
    defer wg.Done()

    // 処理
    if someError {
        results <- Result{Err: fmt.Errorf("worker %d failed", id)}
        return
    }

    results <- Result{Value: id * 2}
}

3. panicからの回復

func safeWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Worker %d recovered from panic: %v\n", id, r)
        }
    }()

    // 危険な処理
    if id == 3 {
        panic("something went wrong")
    }

    fmt.Printf("Worker %d completed\n", id)
}

自己確認問題

  • GMPモデルのG、M、Pはそれぞれ何を表していますか?
  • goroutineの初期スタックサイズは何KBですか?OSスレッドと比較してください。
  • Work Stealingアルゴリズムとは何ですか?なぜ必要ですか?
  • counter++がレースコンディションを引き起こす理由を、アセンブリレベルで説明してください。
  • sync.WaitGroupのAdd()をgoroutine内で呼ぶとなぜ問題なのですか?
  • sync.Mutexのノーマルモードとスターベーションモードの違いは何ですか?
  • sync.RWMutexを使うべき状況はどのような時ですか?
  • goroutineがリークする原因を3つ挙げてください。
  • Go 1.14で導入された非協調的プリエンプションとは何ですか?
  • レースディテクター(-raceフラグ)はどのように動作しますか?実行速度への影響は?
  • Pの数(GOMAXPROCS)を増やせば常にパフォーマンスが向上しますか?理由も説明してください。
  • sync.Mutexとatomic操作(sync/atomic)の違いは何ですか?

まとめ

この章では、Goの並行処理の基礎とGMPモデルの内部構造を学びました。

重要ポイント

  • GMPモデル: G(Goroutine)、M(Machine/OS Thread)、P(Processor)
  • goroutineスケジューラー: Work StealingとPreemption
  • sync.WaitGroup: goroutineの完了を待機
  • レースコンディション: 共有データへの同時アクセスで発生
  • sync.Mutex: 排他制御で共有リソースを保護
  • sync.RWMutex: 読み取りは並行、書き込みは排他

ベストプラクティス

  • defer wg.Done()でDone()忘れを防ぐ
  • -raceフラグで競合を検出
  • goroutineのリークに注意
  • エラーは適切に伝播
  • panicから回復する仕組みを用意

次の章では、チャネルの内部構造(hchan)と、送受信メカニズムを詳しく学びます。