第13章: 並行処理基礎
学習目標
この章を終えると、以下ができるようになります:
- 並行性(concurrency)と並列性(parallelism)の違いを説明できる
- goroutineを使って軽量な並行処理を実装できる
- sync.WaitGroupでgoroutineの完了を待機できる
- レースコンディションを理解し、回避できる
- GoのGMPモデルとスケジューラーの内部動作を理解できる
並行性と並列性
概念の違い
並行性(Concurrency):
- 複数のタスクを「交互に」実行する
- 1つのCPUコアでも実現可能
- 論理的な構造
並列性(Parallelism):
- 複数のタスクを「同時に」実行する
- 複数のCPUコアが必要
- 物理的な実行
並行性(1コア):
Task A: ---- ---- ----
Task B: ---- ---- ----
時間: ─────────────────────────→
並列性(2コア):
Core 1 - Task A: ────────────────
Core 2 - Task B: ────────────────
時間: ─────────────────────────→
Rob Pikeの名言
> "Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." > (並行性は多くのことを扱うこと。並列性は多くのことを同時に行うこと。)
goroutineの基礎
goroutineとは
goroutineは、Goランタイムが管理する軽量なスレッドです。
特徴:
- 初期スタックサイズ: 約2KB(OSスレッドは約1MB)
- 数千〜数万のgoroutineを同時実行可能
goキーワードで起動
最初のgoroutine
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 通常の関数呼び出し
sayHello()
// goroutineとして実行
go sayHello()
// goroutineが実行されるまで待機
time.Sleep(time.Second)
fmt.Println("Main function ends")
}
出力例:
Hello from goroutine!
Hello from goroutine!
Main function ends
GMPモデル:Goスケジューラーの内部構造
🔑 GMPモデルは、Goランタイムの心臓部です。 これを理解することで、goroutineがなぜ軽量で効率的なのかがわかります。
GMPの構成要素
G (Goroutine):
┌─────────────┐
│ Stack │ 約2KB(初期サイズ)
│ PC │ プログラムカウンタ
│ Context │ 実行コンテキスト
└─────────────┘
M (Machine/OS Thread):
┌─────────────┐
│ OS Thread │ 実際のOSスレッド
│ G* │ 現在実行中のG
│ P* │ 割り当てられたP
└─────────────┘
P (Processor):
┌─────────────┐
│ Run Queue │ 実行待ちGのキュー
│ mcache │ メモリキャッシュ
│ 状態 │ 実行可能/アイドル等
└─────────────┘
GMPの関係図
Global Run Queue
┌─────────────────┐
│ G G G G G │
└─────────────────┘
↓
┌───────────────┼───────────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│ P0 │ │ P1 │ │ P2 │
│┌─────┐│ │┌─────┐│ │┌─────┐│
││G G G││ ││G G G││ ││G G G││
│└─────┘│ │└─────┘│ │└─────┘│
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│ M0 │ │ M1 │ │ M2 │
│(OS スレ)│ │(OS スレ)│ │(OS スレ)│
└───────┘ └───────┘ └───────┘
│ │ │
└───────────────┼───────────────┘
↓
OS Kernel
(CPU Cores)
🔑 G(Goroutine)の内部構造
Goroutineはruntime.g構造体として実装されています:
// runtime/runtime2.go (簡略版)
type g struct {
stack stack // スタック記述子
stackguard0 uintptr // スタックガード(オーバーフロー検出)
m *m // 現在のm
sched gobuf // スケジューリングコンテキスト
goid int64 // goroutine ID
gopc uintptr // goroutine作成時のPC
// その他多数のフィールド...
}
type gobuf struct {
sp uintptr // スタックポインタ
pc uintptr // プログラムカウンタ
g *g // goroutineへの参照
ret uintptr // 戻り値
}
スタックの動的成長:
初期状態(2KB):
┌────────────┐ ← SP (Stack Pointer)
│ │
│ 空き │
│ │
├────────────┤
│ ローカル │
│ 変数 │
├────────────┤
│ 戻りアド │
│ レス │
└────────────┘
スタック成長後(4KB):
┌────────────┐
│ │
│ 新しい │
│ フレーム │
├────────────┤ ← 古いSP
│ │
│ 既存の │
│ データ │
│ │
└────────────┘
💡 スタックが不足すると、ランタイムが自動的に新しい領域を割り当て、データをコピーします。
🔑 M(Machine)の内部構造
MはOSスレッドのラッパーです:
// runtime/runtime2.go (簡略版)
type m struct {
g0 *g // スケジューリング用goroutine
curg *g // 現在実行中のgoroutine
p *p // 関連付けられたP
spinning bool // ワークスティール中か
blocked bool // システムコールでブロック中か
// その他多数のフィールド...
}
g0の役割:
通常のgoroutine実行:
M → curg (ユーザーgoroutine)
スケジューリング時:
M → g0 (スケジューリング用goroutine)
↓
次のgoroutineを選択
↓
M → curg (次のgoroutine)
🔑 P(Processor)の内部構造
Pは実行コンテキストとリソースを管理します:
// runtime/runtime2.go (簡略版)
type p struct {
id int32 // P ID
status uint32 // 状態
m *m // 関連付けられたM
runqhead uint32 // ローカルキューのヘッド
runqtail uint32 // ローカルキューのテール
runq [256]guintptr // ローカル実行キュー
runnext guintptr // 次に実行するG
// メモリキャッシュ
mcache *mcache
// その他多数のフィールド...
}
Pの数:
// 環境変数で設定可能
GOMAXPROCS=4 go run main.go
// コード内で設定
runtime.GOMAXPROCS(4)
// デフォルト: CPUコア数
goroutineスケジューラーの動作
スケジューリングの基本フロー
1. Goroutine作成
┌─────────┐
│ go f() │
└────┬────┘
↓
┌────────────────┐
│ 新しいG作成 │
└────┬───────────┘
↓
┌────────────────┐
│ Pのrunqに追加 │
└────┬───────────┘
↓
┌────────────────┐
│ 実行可能状態 │
└────────────────┘
2. Goroutine実行
┌────────────────┐
│ MがPから取得 │
└────┬───────────┘
↓
┌────────────────┐
│ Gを実行 │
└────┬───────────┘
↓
┌────────────────┐
│ 完了/ブロック │
└────┬───────────┘
↓
┌────────────────┐
│ 次のGを取得 │
└────────────────┘
🔑 Work Stealing(ワークスティール)
他のPが忙しい時、アイドルなPが仕事を「盗む」仕組みです。
P0: [G1 G2 G3 G4 G5] (多忙)
P1: [] (アイドル)
P2: [G6 G7] (普通)
↓ Work Stealing
P0: [G1 G2] (半分残す)
P1: [G3 G4 G5] (P0から盗む)
P2: [G6 G7] (変化なし)
ワークスティールのアルゴリズム:
// 疑似コード
func findRunnable() *g {
// 1. ローカルキューをチェック
if gp := runqget(p); gp != nil {
return gp
}
// 2. グローバルキューをチェック
if gp := globrunqget(p); gp != nil {
return gp
}
// 3. 他のPから盗む(Work Stealing)
for i := 0; i < 4; i++ { // ランダムに4回試行
p2 := allp[fastrand() % len(allp)]
if gp := runqsteal(p, p2); gp != nil {
return gp
}
}
// 4. ネットワークポーラーをチェック
if gp := netpoll(); gp != nil {
return gp
}
return nil // 見つからず
}
スケジューリングのタイミング
Goroutineは以下のタイミングで切り替わります:
1. システムコール
┌──────────────┐
│ G1実行中 │
└──────┬───────┘
↓
┌──────────────┐
│ syscall() │ ← ブロッキング
└──────┬───────┘
↓
┌──────────────┐
│ P/Mを分離 │ ← PはスピンまたはIdle
└──────┬───────┘
↓
┌──────────────┐
│ M継続 │ ← 新しいMまたは既存M
│ G1待機 │
└──────┬───────┘
↓
┌──────────────┐
│ syscall完了 │
└──────┬───────┘
↓
┌──────────────┐
│ G1再開 │
└──────────────┘
2. チャネル操作
ch <- v (送信ブロック)
<-ch (受信ブロック)
3. time.Sleep
time.Sleep(time.Second) ← タイマー待機
4. runtime.Gosched()
runtime.Gosched() ← 明示的な譲歩
5. 長時間実行(プリエンプション)
約10msごとに強制スケジューリング
🔑 プリエンプション(Preemption)
Go 1.14以降、非協調的プリエンプションが導入されました。
従来(協調的プリエンプション):
┌──────────────────┐
│ for { │
│ // 無限ループ │ ← 他のgoroutineに譲らない
│ } │ 問題あり!
└──────────────────┘
Go 1.14+(非協調的プリエンプション):
┌──────────────────┐
│ for { │
│ // 無限ループ │ ← シグナルで強制中断
│ } │ ✓ 他のgoroutineも実行可能
└──────────────────┘
シグナルベースのプリエンプション:
1. スケジューラーがG1を長時間実行と判断
↓
2. SIGURG シグナルをスレッドに送信
↓
3. シグナルハンドラーでG1の状態を保存
↓
4. スケジューラーに制御を戻す
↓
5. 別のGを実行
sync.WaitGroup
WaitGroupとは
time.Sleep()は不適切な待機方法です。sync.WaitGroupを使うことで、goroutineの完了を正確に待機できます。
基本メソッド:
Add(n): カウンターをn増加Done(): カウンターを1減少Wait(): カウンターが0になるまで待機
🔑 WaitGroupの内部実装
// sync/waitgroup.go (簡略版)
type WaitGroup struct {
state1 [3]uint32 // カウンター、ウェイター数、セマフォ
}
// 64ビットアラインメントされたカウンター
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
メモリレイアウト:
WaitGroup内部状態(64ビットマシン):
┌──────────────────────────────────┐
│ Counter (32 bit) │ 待機中のgoroutine数
├──────────────────────────────────┤
│ Waiter (32 bit) │ Wait()呼び出し数
├──────────────────────────────────┤
│ Semaphore (32 bit) │ セマフォ
└──────────────────────────────────┘
WaitGroupの動作フロー
初期状態:
Counter: 0, Waiter: 0
Add(3):
Counter: 3, Waiter: 0
↓
goroutine1起動 → Done() → Counter: 2
goroutine2起動 → Done() → Counter: 1
goroutine3起動 → Done() → Counter: 0
↓
Waiterを起床
Wait()呼び出し:
Counter: 0か? → Yes: すぐ返る
→ No: Waiter++, セマフォ待機
WaitGroupの使用例(詳細)
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 関数終了時にDone()を呼ぶ
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // カウンターを増加
go worker(i, &wg) // WaitGroupのポインタを渡す
}
wg.Wait() // すべてのgoroutineが完了するまで待機
fmt.Println("All workers done")
}
⚠️ WaitGroupのよくある間違い
// 悪い例1: Addをgoroutine内で呼ぶ
for i := 0; i < 10; i++ {
go func() {
wg.Add(1) // ❌ レースコンディション!
defer wg.Done()
// ...
}()
}
// 良い例1: Addをgoroutine起動前に呼ぶ
for i := 0; i < 10; i++ {
wg.Add(1) // ✓ main goroutineで呼ぶ
go func() {
defer wg.Done()
// ...
}()
}
// 悪い例2: Doneを忘れる
wg.Add(1)
go func() {
// wg.Done()を呼び忘れ ❌
// ...
}()
wg.Wait() // デッドロック!
// 良い例2: deferでDoneを確実に呼ぶ
wg.Add(1)
go func() {
defer wg.Done() // ✓ panicしても必ず呼ばれる
// ...
}()
レースコンディション
レースコンディションとは
複数のgoroutineが同じメモリ領域に同時アクセスし、少なくとも1つが書き込みを行う状態です。
問題のあるコード:
package main
import (
"fmt"
"sync"
)
func main() {
var counter int
var wg sync.WaitGroup
// 1000個のgoroutineがcounterを増加
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // レースコンディション!
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 期待: 1000, 実際: 900前後(不定)
}
🔑 なぜレースコンディションが起きるのか
counter++は1命令に見えますが、実際は3ステップです:
アセンブリレベル(x86-64):
1. LOAD: MOV RAX, [counter] ; メモリからレジスタへ
2. INC: INC RAX ; レジスタをインクリメント
3. STORE: MOV [counter], RAX ; レジスタからメモリへ
Goroutine1とGoroutine2が同時実行:
時刻 G1 G2
1 LOAD counter (0)
2 LOAD counter (0)
3 INC (1)
4 INC (1)
5 STORE (1)
6 STORE (1)
結果: counter = 1(期待値は2)
レースディテクターの使用
Goには競合検出器が組み込まれています。
go run -race main.go
出力例:
==================
WARNING: DATA RACE
Write at 0x00c000014098 by goroutine 7:
main.main.func1()
/path/to/main.go:15 +0x3e
Previous write at 0x00c000014098 by goroutine 6:
main.main.func1()
/path/to/main.go:15 +0x3e
Goroutine 7 (running) created at:
main.main()
/path/to/main.go:13 +0x8e
Goroutine 6 (finished) created at:
main.main()
/path/to/main.go:13 +0x8e
==================
Counter: 987
Found 1 data race(s)
exit status 66
💡 レースディテクターは、実行中のメモリアクセスを監視し、競合を検出します。ただし、実行速度は約10倍遅くなります。
並行処理の同期
sync.Mutexによる保護
Mutex(mutual exclusion)は、共有リソースへのアクセスを排他制御します。
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Value()) // 確実に1000
}
🔑 Mutexの内部実装
// sync/mutex.go (簡略版)
type Mutex struct {
state int32 // ロック状態とウェイター数
sema uint32 // セマフォ
}
const (
mutexLocked = 1 << iota // 1: ロックされている
mutexWoken // 2: 起床フラグ
mutexStarving // 4: スターベーションモード
mutexWaiterShift = iota // 3: ウェイター数のシフト量
)
Mutexの状態遷移:
初期状態:
state: 0 (アンロック)
Lock()呼び出し:
state: 1 (ロック)
↓
他のgoroutineがLock()呼び出し
↓
state: 1 + (1 << 3) = 9 (ロック + 1ウェイター)
↓
Unlock()呼び出し
↓
ウェイターを起床
state: 0 (アンロック)
ノーマルモードとスターベーションモード:
ノーマルモード:
┌─────────┐
│ Unlock │
└────┬────┘
↓
起床したgoroutineと新しいgoroutineが競争
(新しいgoroutineが有利: すでにCPU上にいる)
スターベーションモード(1ms以上待機したgoroutine存在):
┌─────────┐
│ Unlock │
└────┬────┘
↓
起床したgoroutineに直接渡す
(公平性を保証)
sync.RWMutex(読み書きロック)
読み込みは並行可能、書き込みは排他的にしたい場合に使います。
package main
import (
"fmt"
"sync"
"time"
)
type CachedData struct {
mu sync.RWMutex
data map[string]string
}
func NewCachedData() *CachedData {
return &CachedData{
data: make(map[string]string),
}
}
func (c *CachedData) Get(key string) (string, bool) {
c.mu.RLock() // 読み取りロック(複数goroutineが同時取得可能)
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *CachedData) Set(key, value string) {
c.mu.Lock() // 書き込みロック(排他的)
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := NewCachedData()
var wg sync.WaitGroup
// 書き込みgoroutine
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
cache.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
time.Sleep(100 * time.Millisecond)
}
}()
// 読み込みgoroutine(複数)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
if value, ok := cache.Get("key1"); ok {
fmt.Printf("Reader %d: %s\n", id, value)
}
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
}
🔑 RWMutexの内部実装
// sync/rwmutex.go (簡略版)
type RWMutex struct {
w Mutex // writer用のmutex
writerSem uint32 // writer用セマフォ
readerSem uint32 // reader用セマフォ
readerCount int32 // reader数(負の場合はwriter待機中)
readerWait int32 // writerが待つreader数
}
RLock/RUnlockの動作:
RLock():
1. readerCountをアトミックにインクリメント
2. 負の値(writer待機中)なら、readerSemで待機
RUnlock():
1. readerCountをアトミックにデクリメント
2. 最後のreaderなら、待機中のwriterを起床
Lock/Unlockの動作:
Lock():
1. w.Lock()で他のwriterをブロック
2. readerCountから大きな数を引く(負にする)
3. 既存のreaderが完了するまで待機
Unlock():
1. readerCountを復元(正にする)
2. 待機中のreaderをすべて起床
3. w.Unlock()
実践例:並行Webスクレイパー
package main
import (
"fmt"
"sync"
"time"
)
type Result struct {
URL string
Size int
}
// ページのダウンロードをシミュレート
func fetchURL(url string) Result {
// 実際のHTTPリクエストの代わりにスリープ
time.Sleep(time.Duration(100+len(url)*10) * time.Millisecond)
return Result{
URL: url,
Size: len(url) * 100, // 仮のサイズ
}
}
func worker(id int, urls <-chan string, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for url := range urls {
fmt.Printf("Worker %d: fetching %s\n", id, url)
result := fetchURL(url)
results <- result
}
}
func main() {
urls := []string{
"https://example.com",
"https://golang.org",
"https://github.com",
"https://stackoverflow.com",
"https://reddit.com",
"https://twitter.com",
"https://youtube.com",
"https://wikipedia.org",
}
// チャネルの作成
urlChannel := make(chan string, len(urls))
resultChannel := make(chan Result, len(urls))
var wg sync.WaitGroup
numWorkers := 3
// ワーカーの起動
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, urlChannel, resultChannel, &wg)
}
// URLをチャネルに送信
for _, url := range urls {
urlChannel <- url
}
close(urlChannel)
// 結果の収集用goroutine
go func() {
wg.Wait()
close(resultChannel)
}()
// 結果の表示
totalSize := 0
for result := range resultChannel {
fmt.Printf("✓ %s (size: %d bytes)\n", result.URL, result.Size)
totalSize += result.Size
}
fmt.Printf("\n合計サイズ: %d bytes\n", totalSize)
}
ベストプラクティス
1. goroutineのリーク防止
// 悪い例: goroutineがリークする
func leak() {
ch := make(chan int)
go func() {
val := <-ch // チャネルから読み取りを待つが、永遠に来ない
fmt.Println(val)
}()
// chに何も送信せずに関数終了
}
// 良い例: コンテキストでキャンセル可能
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return
}
}()
}
2. エラー処理
type Result struct {
Value int
Err error
}
func worker(id int, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
// 処理
if someError {
results <- Result{Err: fmt.Errorf("worker %d failed", id)}
return
}
results <- Result{Value: id * 2}
}
3. panicからの回復
func safeWorker(id int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Worker %d recovered from panic: %v\n", id, r)
}
}()
// 危険な処理
if id == 3 {
panic("something went wrong")
}
fmt.Printf("Worker %d completed\n", id)
}
自己確認問題
- GMPモデルのG、M、Pはそれぞれ何を表していますか?
- goroutineの初期スタックサイズは何KBですか?OSスレッドと比較してください。
- Work Stealingアルゴリズムとは何ですか?なぜ必要ですか?
counter++がレースコンディションを引き起こす理由を、アセンブリレベルで説明してください。- sync.WaitGroupのAdd()をgoroutine内で呼ぶとなぜ問題なのですか?
- sync.Mutexのノーマルモードとスターベーションモードの違いは何ですか?
- sync.RWMutexを使うべき状況はどのような時ですか?
- goroutineがリークする原因を3つ挙げてください。
- Go 1.14で導入された非協調的プリエンプションとは何ですか?
- レースディテクター(-raceフラグ)はどのように動作しますか?実行速度への影響は?
- Pの数(GOMAXPROCS)を増やせば常にパフォーマンスが向上しますか?理由も説明してください。
- sync.Mutexとatomic操作(sync/atomic)の違いは何ですか?
まとめ
この章では、Goの並行処理の基礎とGMPモデルの内部構造を学びました。
重要ポイント:
- GMPモデル: G(Goroutine)、M(Machine/OS Thread)、P(Processor)
- goroutineスケジューラー: Work StealingとPreemption
- sync.WaitGroup: goroutineの完了を待機
- レースコンディション: 共有データへの同時アクセスで発生
- sync.Mutex: 排他制御で共有リソースを保護
- sync.RWMutex: 読み取りは並行、書き込みは排他
ベストプラクティス:
defer wg.Done()でDone()忘れを防ぐ-raceフラグで競合を検出- goroutineのリークに注意
- エラーは適切に伝播
- panicから回復する仕組みを用意
次の章では、チャネルの内部構造(hchan)と、送受信メカニズムを詳しく学びます。