Day 4: 並行処理の基礎 - 講義
今日の目標
- ゴルーチンの基本を理解する
- sync.WaitGroupの使い方をマスターする
- sync.Mutex/RWMutexで競合状態を防ぐ
- atomic操作を学ぶ
- データ競合の検出方法を知る
---
ゴルーチンとは
ゴルーチンは、Goのランタイムによって管理される軽量なスレッドです。
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
fmt.Println("Hello,", name)
}
func main() {
// ゴルーチンを起動
go sayHello("World")
go sayHello("Go")
go sayHello("Gopher")
// メインゴルーチンが終了すると全て終了
time.Sleep(time.Second)
}
---
sync.WaitGroup
複数のゴルーチンの完了を待つために使用します。
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
// 何かの処理
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
---
sync.Mutex
クリティカルセクションを保護するために使用します。
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value())
}
---
sync.RWMutex
読み取りと書き込みを区別するミューテックスです。
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := &Cache{data: make(map[string]string)}
cache.Set("name", "太郎")
if value, ok := cache.Get("name"); ok {
fmt.Println("Value:", value)
}
}
---
atomic操作
単純なカウンターにはatomic操作が効率的です。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("Counter:", atomic.LoadInt64(&counter))
}
---
データ競合の検出
# -race フラグでデータ競合を検出
go run -race main.go
go test -race ./...
---
並行処理の理論的背景
CSP(Communicating Sequential Processes)
Goの並行処理モデルは、Tony Hoareが1978年に提唱したCSP理論に基づいています。
CSPの核心原則:
- 独立したプロセス(ゴルーチン)が並行実行される
- プロセス間の通信はメッセージパッシング(チャネル)で行う
- 共有メモリではなく、通信によってデータを共有する
Goでの実装:
package main
import (
"fmt"
"time"
)
// プロデューサー:データを生成
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
fmt.Printf("生成: %d\n", i)
ch <- i
time.Sleep(time.Millisecond * 100)
}
close(ch)
}
// コンシューマー:データを消費
func consumer(ch <-chan int, done chan<- bool) {
for value := range ch {
fmt.Printf("消費: %d\n", value)
time.Sleep(time.Millisecond * 200)
}
done <- true
}
func main() {
ch := make(chan int)
done := make(chan bool)
go producer(ch)
go consumer(ch, done)
<-done
fmt.Println("完了")
}
ゴルーチンの内部実装
Mスレッド:Nゴルーチンのスケジューリングモデル(M:N scheduler)
OS Thread (M)
↓
Processor (P) - ローカルランキュー
↓
Goroutine (G) - スタック(初期2KB、最大1GB)
スケジューラのコンポーネント:
- M (Machine): OSスレッド
- P (Processor): 論理プロセッサ(デフォルトでCPUコア数)
- G (Goroutine): ゴルーチン
package main
import (
"fmt"
"runtime"
"sync"
)
func demonstrateScheduler() {
// CPUコア数を取得
numCPU := runtime.NumCPU()
fmt.Printf("CPU数: %d\n", numCPU)
// GOMAXPROCSを設定(Pの数)
runtime.GOMAXPROCS(numCPU)
// 現在のゴルーチン数
fmt.Printf("開始時のゴルーチン数: %d\n", runtime.NumGoroutine())
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 何か処理
}(i)
}
fmt.Printf("実行中のゴルーチン数: %d\n", runtime.NumGoroutine())
wg.Wait()
fmt.Printf("終了時のゴルーチン数: %d\n", runtime.NumGoroutine())
}
---
プロダクション環境でのパターン
ワーカープールパターン
リソースの効率的な利用と並行数の制限
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Value int
Err error
}
// ワーカープール
type WorkerPool struct {
workers int
jobs chan Job
results chan Result
wg sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
}
func NewWorkerPool(workers int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
workers: workers,
jobs: make(chan Job, workers*2),
results: make(chan Result, workers*2),
ctx: ctx,
cancelFunc: cancel,
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case job, ok := <-wp.jobs:
if !ok {
fmt.Printf("Worker %d: ジョブチャネルがクローズされました\n", id)
return
}
fmt.Printf("Worker %d: ジョブ %d を処理中\n", id, job.ID)
// 実際の処理をシミュレート
time.Sleep(time.Millisecond * 100)
result := Result{
Job: job,
Value: len(job.Data),
Err: nil,
}
select {
case wp.results <- result:
case <-wp.ctx.Done():
return
}
case <-wp.ctx.Done():
fmt.Printf("Worker %d: キャンセルされました\n", id)
return
}
}
}
func (wp *WorkerPool) Submit(job Job) error {
select {
case wp.jobs <- job:
return nil
case <-wp.ctx.Done():
return fmt.Errorf("worker pool is closed")
}
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Shutdown(timeout time.Duration) {
done := make(chan struct{})
go func() {
wp.Close()
close(done)
}()
select {
case <-done:
fmt.Println("正常にシャットダウンしました")
case <-time.After(timeout):
wp.cancelFunc()
fmt.Println("タイムアウト - 強制終了しました")
}
}
// 使用例
func demonstrateWorkerPool() {
pool := NewWorkerPool(4)
pool.Start()
// ジョブを投入
go func() {
for i := 0; i < 20; i++ {
job := Job{
ID: i,
Data: fmt.Sprintf("data-%d", i),
}
if err := pool.Submit(job); err != nil {
fmt.Printf("ジョブ投入エラー: %v\n", err)
return
}
}
}()
// 結果を収集
go func() {
for result := range pool.Results() {
fmt.Printf("結果: ジョブID=%d, 値=%d\n", result.Job.ID, result.Value)
}
}()
// 3秒後にシャットダウン
time.Sleep(3 * time.Second)
pool.Shutdown(time.Second * 2)
}
セマフォパターン
並行数の制限
package main
import (
"context"
"fmt"
"sync"
"time"
)
// セマフォの実装
type Semaphore struct {
permits chan struct{}
}
func NewSemaphore(maxConcurrent int) *Semaphore {
return &Semaphore{
permits: make(chan struct{}, maxConcurrent),
}
}
func (s *Semaphore) Acquire() {
s.permits <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.permits
}
func (s *Semaphore) TryAcquire(timeout time.Duration) bool {
select {
case s.permits <- struct{}{}:
return true
case <-time.After(timeout):
return false
}
}
// 重い処理のシミュレーション
func heavyTask(id int, sem *Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
// セマフォを取得
sem.Acquire()
defer sem.Release()
fmt.Printf("タスク %d: 開始\n", id)
time.Sleep(time.Second)
fmt.Printf("タスク %d: 完了\n", id)
}
func demonstrateSemaphore() {
sem := NewSemaphore(3) // 最大3並行
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go heavyTask(i, sem, &wg)
}
wg.Wait()
fmt.Println("全タスク完了")
}
パイプラインパターン
データの段階的処理
package main
import (
"fmt"
"sync"
)
// ステージ1: データ生成
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// ステージ2: 2倍にする
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
// ステージ3: フィルタリング(偶数のみ)
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%4 == 0 {
out <- n
}
}
close(out)
}()
return out
}
// ファンアウト: 複数のワーカーに分散
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = process(in)
}
return channels
}
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
// 重い処理をシミュレート
out <- n * n
}
close(out)
}()
return out
}
// ファンイン: 複数のチャネルを1つに集約
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func demonstratePipeline() {
// パイプラインの構築
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
doubled := double(nums)
filtered := filter(doubled)
// ファンアウト・ファンイン
workers := fanOut(filtered, 3)
results := fanIn(workers...)
// 結果を出力
for result := range results {
fmt.Println(result)
}
}
---
並行処理のアンチパターン
1. ゴルーチンリーク
// 悪い例:ゴルーチンがリークする
func badLeaking() {
ch := make(chan int)
go func() {
// このゴルーチンは永遠に待ち続ける
value := <-ch
fmt.Println(value)
}()
// チャネルに送信しないまま関数が終了
}
// 良い例:タイムアウトとコンテキストを使用
func goodNoLeaking(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case value := <-ch:
fmt.Println(value)
case <-ctx.Done():
fmt.Println("キャンセルされました")
return
}
}()
// 適切にクリーンアップ
}
2. データ競合
// 悪い例:データ競合
func badRace() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 競合状態!
}()
}
wg.Wait()
fmt.Println(counter) // 不定な値
}
// 良い例:Mutexで保護
func goodNoRace() {
var counter int
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counter) // 常に1000
}
// さらに良い例:atomic操作
func bestNoRace() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println(atomic.LoadInt64(&counter)) // 常に1000
}
3. デッドロック
// 悪い例:デッドロック
func badDeadlock() {
var mu1, mu2 sync.Mutex
// ゴルーチン1
go func() {
mu1.Lock()
time.Sleep(time.Millisecond)
mu2.Lock() // ゴルーチン2がmu2を保持しているので待機
mu2.Unlock()
mu1.Unlock()
}()
// ゴルーチン2
go func() {
mu2.Lock()
time.Sleep(time.Millisecond)
mu1.Lock() // ゴルーチン1がmu1を保持しているので待機
mu1.Unlock()
mu2.Unlock()
}()
time.Sleep(time.Second)
}
// 良い例:ロックの順序を統一
func goodNoDeadlock() {
var mu1, mu2 sync.Mutex
lockBoth := func() {
mu1.Lock()
mu2.Lock()
}
unlockBoth := func() {
mu2.Unlock()
mu1.Unlock()
}
// ゴルーチン1
go func() {
lockBoth()
// 処理
unlockBoth()
}()
// ゴルーチン2
go func() {
lockBoth() // 同じ順序でロック
// 処理
unlockBoth()
}()
}
---
OSSプロジェクトから学ぶパターン
Kubernetesのワーカーキュー
package main
import (
"fmt"
"sync"
"time"
)
// Kubernetesスタイルのワーカーキュー
type WorkQueue struct {
queue []interface{}
mu sync.Mutex
cond *sync.Cond
workers int
wg sync.WaitGroup
closed bool
}
func NewWorkQueue(workers int) *WorkQueue {
wq := &WorkQueue{
queue: make([]interface{}, 0),
workers: workers,
}
wq.cond = sync.NewCond(&wq.mu)
return wq
}
func (wq *WorkQueue) Add(item interface{}) {
wq.mu.Lock()
defer wq.mu.Unlock()
if wq.closed {
return
}
wq.queue = append(wq.queue, item)
wq.cond.Signal() // ワーカーに通知
}
func (wq *WorkQueue) Get() (interface{}, bool) {
wq.mu.Lock()
defer wq.mu.Unlock()
for len(wq.queue) == 0 && !wq.closed {
wq.cond.Wait() // アイテムが追加されるまで待機
}
if wq.closed && len(wq.queue) == 0 {
return nil, false
}
item := wq.queue[0]
wq.queue = wq.queue[1:]
return item, true
}
func (wq *WorkQueue) Start(processFunc func(interface{})) {
for i := 0; i < wq.workers; i++ {
wq.wg.Add(1)
go func(id int) {
defer wq.wg.Done()
for {
item, ok := wq.Get()
if !ok {
fmt.Printf("Worker %d: 終了\n", id)
return
}
fmt.Printf("Worker %d: 処理中\n", id)
processFunc(item)
}
}(i)
}
}
func (wq *WorkQueue) Shutdown() {
wq.mu.Lock()
wq.closed = true
wq.cond.Broadcast() // 全ワーカーに通知
wq.mu.Unlock()
wq.wg.Wait()
}
// 使用例
func demonstrateWorkQueue() {
queue := NewWorkQueue(3)
// ワーカー起動
queue.Start(func(item interface{}) {
// 処理をシミュレート
time.Sleep(time.Millisecond * 100)
fmt.Printf("処理完了: %v\n", item)
})
// アイテムを追加
for i := 0; i < 10; i++ {
queue.Add(fmt.Sprintf("task-%d", i))
}
// シャットダウン
time.Sleep(time.Second)
queue.Shutdown()
}
Prometheusのメトリクス収集パターン
package main
import (
"fmt"
"sync"
"time"
)
// メトリクスコレクター
type MetricsCollector struct {
mu sync.RWMutex
metrics map[string]float64
ticker *time.Ticker
done chan struct{}
}
func NewMetricsCollector(interval time.Duration) *MetricsCollector {
mc := &MetricsCollector{
metrics: make(map[string]float64),
ticker: time.NewTicker(interval),
done: make(chan struct{}),
}
go mc.collect()
return mc
}
func (mc *MetricsCollector) collect() {
for {
select {
case <-mc.ticker.C:
mc.snapshot()
case <-mc.done:
return
}
}
}
func (mc *MetricsCollector) snapshot() {
mc.mu.RLock()
defer mc.mu.RUnlock()
fmt.Println("=== メトリクススナップショット ===")
for name, value := range mc.metrics {
fmt.Printf("%s: %.2f\n", name, value)
}
}
func (mc *MetricsCollector) Set(name string, value float64) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.metrics[name] = value
}
func (mc *MetricsCollector) Inc(name string) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.metrics[name]++
}
func (mc *MetricsCollector) Stop() {
mc.ticker.Stop()
close(mc.done)
}
---
高度な並行処理テクニック
コンテキストによるキャンセル伝播
package main
import (
"context"
"fmt"
"time"
)
func doWork(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s: キャンセルされました (%v)\n", name, ctx.Err())
return
default:
fmt.Printf("%s: 作業中...\n", name)
time.Sleep(time.Millisecond * 500)
}
}
}
func demonstrateContextCancellation() {
// タイムアウト付きコンテキスト
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// 複数のワーカーを起動
go doWork(ctx, "Worker-1")
go doWork(ctx, "Worker-2")
go doWork(ctx, "Worker-3")
// キャンセル理由を持つコンテキスト
causeCtx, causeCancel := context.WithCancelCause(context.Background())
go func() {
time.Sleep(time.Second)
causeCancel(fmt.Errorf("カスタム理由でキャンセル"))
}()
go doWork(causeCtx, "Worker-4")
time.Sleep(3 * time.Second)
}
sync.Poolによるオブジェクトの再利用
package main
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("新しいバッファを作成")
return new(bytes.Buffer)
},
}
func processData(data string) string {
// プールからバッファを取得
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset() // バッファをクリア
bufferPool.Put(buf) // プールに返却
}()
buf.WriteString(data)
buf.WriteString(" - processed")
return buf.String()
}
func demonstratePool() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processData(fmt.Sprintf("data-%d", id))
fmt.Println(result)
}(i)
}
wg.Wait()
}
---
まとめ
並行処理は強力ですが、適切に使用する必要があります:
-raceフラグで競合状態を検出明日はチャネルを使った高度な並行処理パターンを学びます。