Day 4: 並行処理の基礎 - 講義

今日の目標

  • ゴルーチンの基本を理解する
  • sync.WaitGroupの使い方をマスターする
  • sync.Mutex/RWMutexで競合状態を防ぐ
  • atomic操作を学ぶ
  • データ競合の検出方法を知る

---

ゴルーチンとは

ゴルーチンは、Goのランタイムによって管理される軽量なスレッドです。

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    fmt.Println("Hello,", name)
}

func main() {
    // ゴルーチンを起動
    go sayHello("World")
    go sayHello("Go")
    go sayHello("Gopher")

    // メインゴルーチンが終了すると全て終了
    time.Sleep(time.Second)
}

---

sync.WaitGroup

複数のゴルーチンの完了を待つために使用します。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    // 何かの処理
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

---

sync.Mutex

クリティカルセクションを保護するために使用します。

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("Final count:", counter.Value())
}

---

sync.RWMutex

読み取りと書き込みを区別するミューテックスです。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    value, ok := c.data[key]
    return value, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    cache := &Cache{data: make(map[string]string)}
    cache.Set("name", "太郎")

    if value, ok := cache.Get("name"); ok {
        fmt.Println("Value:", value)
    }
}

---

atomic操作

単純なカウンターにはatomic操作が効率的です。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int64

    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", atomic.LoadInt64(&counter))
}

---

データ競合の検出

# -race フラグでデータ競合を検出
go run -race main.go
go test -race ./...

---

並行処理の理論的背景

CSP(Communicating Sequential Processes)

Goの並行処理モデルは、Tony Hoareが1978年に提唱したCSP理論に基づいています。

CSPの核心原則:

  • 独立したプロセス(ゴルーチン)が並行実行される
  • プロセス間の通信はメッセージパッシング(チャネル)で行う
  • 共有メモリではなく、通信によってデータを共有する

Goでの実装:

package main

import (
    "fmt"
    "time"
)

// プロデューサー:データを生成
func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        fmt.Printf("生成: %d\n", i)
        ch <- i
        time.Sleep(time.Millisecond * 100)
    }
    close(ch)
}

// コンシューマー:データを消費
func consumer(ch <-chan int, done chan<- bool) {
    for value := range ch {
        fmt.Printf("消費: %d\n", value)
        time.Sleep(time.Millisecond * 200)
    }
    done <- true
}

func main() {
    ch := make(chan int)
    done := make(chan bool)

    go producer(ch)
    go consumer(ch, done)

    <-done
    fmt.Println("完了")
}

ゴルーチンの内部実装

Mスレッド:Nゴルーチンのスケジューリングモデル(M:N scheduler)

OS Thread (M)
    ↓
Processor (P) - ローカルランキュー
    ↓
Goroutine (G) - スタック(初期2KB、最大1GB)

スケジューラのコンポーネント:

  • M (Machine): OSスレッド
  • P (Processor): 論理プロセッサ(デフォルトでCPUコア数)
  • G (Goroutine): ゴルーチン
  • package main
    
    import (
        "fmt"
        "runtime"
        "sync"
    )
    
    func demonstrateScheduler() {
        // CPUコア数を取得
        numCPU := runtime.NumCPU()
        fmt.Printf("CPU数: %d\n", numCPU)
    
        // GOMAXPROCSを設定(Pの数)
        runtime.GOMAXPROCS(numCPU)
    
        // 現在のゴルーチン数
        fmt.Printf("開始時のゴルーチン数: %d\n", runtime.NumGoroutine())
    
        var wg sync.WaitGroup
        for i := 0; i < 100; i++ {
            wg.Add(1)
            go func(id int) {
                defer wg.Done()
                // 何か処理
            }(i)
        }
    
        fmt.Printf("実行中のゴルーチン数: %d\n", runtime.NumGoroutine())
    
        wg.Wait()
        fmt.Printf("終了時のゴルーチン数: %d\n", runtime.NumGoroutine())
    }
    

    ---

    プロダクション環境でのパターン

    ワーカープールパターン

    リソースの効率的な利用と並行数の制限

    package main
    
    import (
        "context"
        "fmt"
        "sync"
        "time"
    )
    
    type Job struct {
        ID   int
        Data string
    }
    
    type Result struct {
        Job   Job
        Value int
        Err   error
    }
    
    // ワーカープール
    type WorkerPool struct {
        workers    int
        jobs       chan Job
        results    chan Result
        wg         sync.WaitGroup
        ctx        context.Context
        cancelFunc context.CancelFunc
    }
    
    func NewWorkerPool(workers int) *WorkerPool {
        ctx, cancel := context.WithCancel(context.Background())
        return &WorkerPool{
            workers:    workers,
            jobs:       make(chan Job, workers*2),
            results:    make(chan Result, workers*2),
            ctx:        ctx,
            cancelFunc: cancel,
        }
    }
    
    func (wp *WorkerPool) Start() {
        for i := 0; i < wp.workers; i++ {
            wp.wg.Add(1)
            go wp.worker(i)
        }
    }
    
    func (wp *WorkerPool) worker(id int) {
        defer wp.wg.Done()
    
        for {
            select {
            case job, ok := <-wp.jobs:
                if !ok {
                    fmt.Printf("Worker %d: ジョブチャネルがクローズされました\n", id)
                    return
                }
    
                fmt.Printf("Worker %d: ジョブ %d を処理中\n", id, job.ID)
    
                // 実際の処理をシミュレート
                time.Sleep(time.Millisecond * 100)
    
                result := Result{
                    Job:   job,
                    Value: len(job.Data),
                    Err:   nil,
                }
    
                select {
                case wp.results <- result:
                case <-wp.ctx.Done():
                    return
                }
    
            case <-wp.ctx.Done():
                fmt.Printf("Worker %d: キャンセルされました\n", id)
                return
            }
        }
    }
    
    func (wp *WorkerPool) Submit(job Job) error {
        select {
        case wp.jobs <- job:
            return nil
        case <-wp.ctx.Done():
            return fmt.Errorf("worker pool is closed")
        }
    }
    
    func (wp *WorkerPool) Results() <-chan Result {
        return wp.results
    }
    
    func (wp *WorkerPool) Close() {
        close(wp.jobs)
        wp.wg.Wait()
        close(wp.results)
    }
    
    func (wp *WorkerPool) Shutdown(timeout time.Duration) {
        done := make(chan struct{})
    
        go func() {
            wp.Close()
            close(done)
        }()
    
        select {
        case <-done:
            fmt.Println("正常にシャットダウンしました")
        case <-time.After(timeout):
            wp.cancelFunc()
            fmt.Println("タイムアウト - 強制終了しました")
        }
    }
    
    // 使用例
    func demonstrateWorkerPool() {
        pool := NewWorkerPool(4)
        pool.Start()
    
        // ジョブを投入
        go func() {
            for i := 0; i < 20; i++ {
                job := Job{
                    ID:   i,
                    Data: fmt.Sprintf("data-%d", i),
                }
                if err := pool.Submit(job); err != nil {
                    fmt.Printf("ジョブ投入エラー: %v\n", err)
                    return
                }
            }
        }()
    
        // 結果を収集
        go func() {
            for result := range pool.Results() {
                fmt.Printf("結果: ジョブID=%d, 値=%d\n", result.Job.ID, result.Value)
            }
        }()
    
        // 3秒後にシャットダウン
        time.Sleep(3 * time.Second)
        pool.Shutdown(time.Second * 2)
    }
    

    セマフォパターン

    並行数の制限

    package main
    
    import (
        "context"
        "fmt"
        "sync"
        "time"
    )
    
    // セマフォの実装
    type Semaphore struct {
        permits chan struct{}
    }
    
    func NewSemaphore(maxConcurrent int) *Semaphore {
        return &Semaphore{
            permits: make(chan struct{}, maxConcurrent),
        }
    }
    
    func (s *Semaphore) Acquire() {
        s.permits <- struct{}{}
    }
    
    func (s *Semaphore) Release() {
        <-s.permits
    }
    
    func (s *Semaphore) TryAcquire(timeout time.Duration) bool {
        select {
        case s.permits <- struct{}{}:
            return true
        case <-time.After(timeout):
            return false
        }
    }
    
    // 重い処理のシミュレーション
    func heavyTask(id int, sem *Semaphore, wg *sync.WaitGroup) {
        defer wg.Done()
    
        // セマフォを取得
        sem.Acquire()
        defer sem.Release()
    
        fmt.Printf("タスク %d: 開始\n", id)
        time.Sleep(time.Second)
        fmt.Printf("タスク %d: 完了\n", id)
    }
    
    func demonstrateSemaphore() {
        sem := NewSemaphore(3) // 最大3並行
        var wg sync.WaitGroup
    
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go heavyTask(i, sem, &wg)
        }
    
        wg.Wait()
        fmt.Println("全タスク完了")
    }
    

    パイプラインパターン

    データの段階的処理

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // ステージ1: データ生成
    func generate(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    // ステージ2: 2倍にする
    func double(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                out <- n * 2
            }
            close(out)
        }()
        return out
    }
    
    // ステージ3: フィルタリング(偶数のみ)
    func filter(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                if n%4 == 0 {
                    out <- n
                }
            }
            close(out)
        }()
        return out
    }
    
    // ファンアウト: 複数のワーカーに分散
    func fanOut(in <-chan int, workers int) []<-chan int {
        channels := make([]<-chan int, workers)
        for i := 0; i < workers; i++ {
            channels[i] = process(in)
        }
        return channels
    }
    
    func process(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                // 重い処理をシミュレート
                out <- n * n
            }
            close(out)
        }()
        return out
    }
    
    // ファンイン: 複数のチャネルを1つに集約
    func fanIn(channels ...<-chan int) <-chan int {
        out := make(chan int)
        var wg sync.WaitGroup
    
        wg.Add(len(channels))
        for _, ch := range channels {
            go func(c <-chan int) {
                defer wg.Done()
                for n := range c {
                    out <- n
                }
            }(ch)
        }
    
        go func() {
            wg.Wait()
            close(out)
        }()
    
        return out
    }
    
    func demonstratePipeline() {
        // パイプラインの構築
        nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        doubled := double(nums)
        filtered := filter(doubled)
    
        // ファンアウト・ファンイン
        workers := fanOut(filtered, 3)
        results := fanIn(workers...)
    
        // 結果を出力
        for result := range results {
            fmt.Println(result)
        }
    }
    

    ---

    並行処理のアンチパターン

    1. ゴルーチンリーク

    // 悪い例:ゴルーチンがリークする
    func badLeaking() {
        ch := make(chan int)
    
        go func() {
            // このゴルーチンは永遠に待ち続ける
            value := <-ch
            fmt.Println(value)
        }()
    
        // チャネルに送信しないまま関数が終了
    }
    
    // 良い例:タイムアウトとコンテキストを使用
    func goodNoLeaking(ctx context.Context) {
        ch := make(chan int)
    
        go func() {
            select {
            case value := <-ch:
                fmt.Println(value)
            case <-ctx.Done():
                fmt.Println("キャンセルされました")
                return
            }
        }()
    
        // 適切にクリーンアップ
    }
    

    2. データ競合

    // 悪い例:データ競合
    func badRace() {
        var counter int
        var wg sync.WaitGroup
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                counter++ // 競合状態!
            }()
        }
    
        wg.Wait()
        fmt.Println(counter) // 不定な値
    }
    
    // 良い例:Mutexで保護
    func goodNoRace() {
        var counter int
        var mu sync.Mutex
        var wg sync.WaitGroup
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                mu.Lock()
                counter++
                mu.Unlock()
            }()
        }
    
        wg.Wait()
        fmt.Println(counter) // 常に1000
    }
    
    // さらに良い例:atomic操作
    func bestNoRace() {
        var counter int64
        var wg sync.WaitGroup
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                atomic.AddInt64(&counter, 1)
            }()
        }
    
        wg.Wait()
        fmt.Println(atomic.LoadInt64(&counter)) // 常に1000
    }
    

    3. デッドロック

    // 悪い例:デッドロック
    func badDeadlock() {
        var mu1, mu2 sync.Mutex
    
        // ゴルーチン1
        go func() {
            mu1.Lock()
            time.Sleep(time.Millisecond)
            mu2.Lock() // ゴルーチン2がmu2を保持しているので待機
            mu2.Unlock()
            mu1.Unlock()
        }()
    
        // ゴルーチン2
        go func() {
            mu2.Lock()
            time.Sleep(time.Millisecond)
            mu1.Lock() // ゴルーチン1がmu1を保持しているので待機
            mu1.Unlock()
            mu2.Unlock()
        }()
    
        time.Sleep(time.Second)
    }
    
    // 良い例:ロックの順序を統一
    func goodNoDeadlock() {
        var mu1, mu2 sync.Mutex
    
        lockBoth := func() {
            mu1.Lock()
            mu2.Lock()
        }
    
        unlockBoth := func() {
            mu2.Unlock()
            mu1.Unlock()
        }
    
        // ゴルーチン1
        go func() {
            lockBoth()
            // 処理
            unlockBoth()
        }()
    
        // ゴルーチン2
        go func() {
            lockBoth() // 同じ順序でロック
            // 処理
            unlockBoth()
        }()
    }
    

    ---

    OSSプロジェクトから学ぶパターン

    Kubernetesのワーカーキュー

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    // Kubernetesスタイルのワーカーキュー
    type WorkQueue struct {
        queue   []interface{}
        mu      sync.Mutex
        cond    *sync.Cond
        workers int
        wg      sync.WaitGroup
        closed  bool
    }
    
    func NewWorkQueue(workers int) *WorkQueue {
        wq := &WorkQueue{
            queue:   make([]interface{}, 0),
            workers: workers,
        }
        wq.cond = sync.NewCond(&wq.mu)
        return wq
    }
    
    func (wq *WorkQueue) Add(item interface{}) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
    
        if wq.closed {
            return
        }
    
        wq.queue = append(wq.queue, item)
        wq.cond.Signal() // ワーカーに通知
    }
    
    func (wq *WorkQueue) Get() (interface{}, bool) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
    
        for len(wq.queue) == 0 && !wq.closed {
            wq.cond.Wait() // アイテムが追加されるまで待機
        }
    
        if wq.closed && len(wq.queue) == 0 {
            return nil, false
        }
    
        item := wq.queue[0]
        wq.queue = wq.queue[1:]
        return item, true
    }
    
    func (wq *WorkQueue) Start(processFunc func(interface{})) {
        for i := 0; i < wq.workers; i++ {
            wq.wg.Add(1)
            go func(id int) {
                defer wq.wg.Done()
                for {
                    item, ok := wq.Get()
                    if !ok {
                        fmt.Printf("Worker %d: 終了\n", id)
                        return
                    }
                    fmt.Printf("Worker %d: 処理中\n", id)
                    processFunc(item)
                }
            }(i)
        }
    }
    
    func (wq *WorkQueue) Shutdown() {
        wq.mu.Lock()
        wq.closed = true
        wq.cond.Broadcast() // 全ワーカーに通知
        wq.mu.Unlock()
    
        wq.wg.Wait()
    }
    
    // 使用例
    func demonstrateWorkQueue() {
        queue := NewWorkQueue(3)
    
        // ワーカー起動
        queue.Start(func(item interface{}) {
            // 処理をシミュレート
            time.Sleep(time.Millisecond * 100)
            fmt.Printf("処理完了: %v\n", item)
        })
    
        // アイテムを追加
        for i := 0; i < 10; i++ {
            queue.Add(fmt.Sprintf("task-%d", i))
        }
    
        // シャットダウン
        time.Sleep(time.Second)
        queue.Shutdown()
    }
    

    Prometheusのメトリクス収集パターン

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    // メトリクスコレクター
    type MetricsCollector struct {
        mu      sync.RWMutex
        metrics map[string]float64
        ticker  *time.Ticker
        done    chan struct{}
    }
    
    func NewMetricsCollector(interval time.Duration) *MetricsCollector {
        mc := &MetricsCollector{
            metrics: make(map[string]float64),
            ticker:  time.NewTicker(interval),
            done:    make(chan struct{}),
        }
    
        go mc.collect()
        return mc
    }
    
    func (mc *MetricsCollector) collect() {
        for {
            select {
            case <-mc.ticker.C:
                mc.snapshot()
            case <-mc.done:
                return
            }
        }
    }
    
    func (mc *MetricsCollector) snapshot() {
        mc.mu.RLock()
        defer mc.mu.RUnlock()
    
        fmt.Println("=== メトリクススナップショット ===")
        for name, value := range mc.metrics {
            fmt.Printf("%s: %.2f\n", name, value)
        }
    }
    
    func (mc *MetricsCollector) Set(name string, value float64) {
        mc.mu.Lock()
        defer mc.mu.Unlock()
        mc.metrics[name] = value
    }
    
    func (mc *MetricsCollector) Inc(name string) {
        mc.mu.Lock()
        defer mc.mu.Unlock()
        mc.metrics[name]++
    }
    
    func (mc *MetricsCollector) Stop() {
        mc.ticker.Stop()
        close(mc.done)
    }
    

    ---

    高度な並行処理テクニック

    コンテキストによるキャンセル伝播

    package main
    
    import (
        "context"
        "fmt"
        "time"
    )
    
    func doWork(ctx context.Context, name string) {
        for {
            select {
            case <-ctx.Done():
                fmt.Printf("%s: キャンセルされました (%v)\n", name, ctx.Err())
                return
            default:
                fmt.Printf("%s: 作業中...\n", name)
                time.Sleep(time.Millisecond * 500)
            }
        }
    }
    
    func demonstrateContextCancellation() {
        // タイムアウト付きコンテキスト
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()
    
        // 複数のワーカーを起動
        go doWork(ctx, "Worker-1")
        go doWork(ctx, "Worker-2")
        go doWork(ctx, "Worker-3")
    
        // キャンセル理由を持つコンテキスト
        causeCtx, causeCancel := context.WithCancelCause(context.Background())
        go func() {
            time.Sleep(time.Second)
            causeCancel(fmt.Errorf("カスタム理由でキャンセル"))
        }()
    
        go doWork(causeCtx, "Worker-4")
    
        time.Sleep(3 * time.Second)
    }
    

    sync.Poolによるオブジェクトの再利用

    package main
    
    import (
        "bytes"
        "fmt"
        "sync"
    )
    
    var bufferPool = sync.Pool{
        New: func() interface{} {
            fmt.Println("新しいバッファを作成")
            return new(bytes.Buffer)
        },
    }
    
    func processData(data string) string {
        // プールからバッファを取得
        buf := bufferPool.Get().(*bytes.Buffer)
        defer func() {
            buf.Reset()           // バッファをクリア
            bufferPool.Put(buf)   // プールに返却
        }()
    
        buf.WriteString(data)
        buf.WriteString(" - processed")
        return buf.String()
    }
    
    func demonstratePool() {
        var wg sync.WaitGroup
    
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(id int) {
                defer wg.Done()
                result := processData(fmt.Sprintf("data-%d", id))
                fmt.Println(result)
            }(i)
        }
    
        wg.Wait()
    }
    

    ---

    まとめ

    並行処理は強力ですが、適切に使用する必要があります:

  • ゴルーチンは軽量 - 数千〜数百万のゴルーチンを作成できる
  • チャネルで通信 - 共有メモリよりメッセージパッシングを優先
  • 適切な同期 - Mutex、RWMutex、atomic、チャネルを適切に選択
  • リソース管理 - ゴルーチンリークを避け、適切にクリーンアップ
  • コンテキスト活用 - キャンセルとタイムアウトの伝播
  • テストは必須 - -raceフラグで競合状態を検出

明日はチャネルを使った高度な並行処理パターンを学びます。