Day 4: 並行処理の基礎 - 解説
目次
- 並行処理の理論的背景
- 詳細なアルゴリズム分析
- 設計原則と設計決定
- Mental Model: 直感的理解
- Further Exploration: 発展的トピック
- ビジュアル図解
- Self-Check Questions
- Production Considerations
- 独立したプロセス: 各プロセス(goroutine)は独立して並行実行される
- メッセージパッシング: プロセス間の通信はチャネルを通じて行う
- 共有メモリの回避: "Don't communicate by sharing memory; share memory by communicating"
---
並行処理の理論的背景
CSP (Communicating Sequential Processes)
Goの並行処理モデルは、Tony Hoareが1978年に提唱したCSP理論に基づいています。
CSPの核心原則
従来の並行処理との違い
従来のスレッドベース:
スレッド1 ─┐
├─→ [共有メモリ] ←─┐
スレッド2 ─┘ │
│
スレッド3 ────────────────────┘
問題点:
- ロックが複雑
- デッドロックのリスク
- 競合状態が発生しやすい
CSP/Go方式:
Goroutine1 ──→ [Channel] ──→ Goroutine2
↑
│
Goroutine3
利点:
- シンプルな通信モデル
- デッドロックが少ない
- 型安全な通信
Goでの実装例
package main
import (
"fmt"
"time"
)
// プロデューサー: データを生成してチャネルに送信
func producer(ch chan<- int, name string) {
for i := 0; i < 5; i++ {
fmt.Printf("%s: データ %d を生成\n", name, i)
ch <- i // チャネルに送信(通信)
time.Sleep(time.Millisecond * 100)
}
close(ch) // 送信完了を通知
}
// コンシューマー: チャネルからデータを受信して処理
func consumer(ch <-chan int, done chan<- bool, name string) {
for value := range ch { // チャネルがクローズされるまでループ
fmt.Printf("%s: データ %d を消費\n", name, value)
time.Sleep(time.Millisecond * 200)
}
done <- true // 完了を通知
}
func main() {
// チャネルを作成(通信路)
ch := make(chan int)
done := make(chan bool)
// プロデューサーとコンシューマーを並行実行
go producer(ch, "Producer")
go consumer(ch, done, "Consumer")
<-done // 完了を待機
fmt.Println("全処理完了")
}
このコードは、共有メモリなしで、チャネルによる通信のみでデータを受け渡しています。
---
詳細なアルゴリズム分析
G-M-Pモデル: Goのスケジューラ
GoのランタイムはM:N スケジューリングモデルを採用しています。
コンポーネント
- G (Goroutine): 実行される関数とそのスタック
- M (Machine): OSスレッド
- P (Processor): 論理プロセッサ
スケジューリングの流れ
1. Goroutineが作成される(G)
↓
2. Pのローカルキューに追加
↓
3. MがPから実行可能なGを取得
↓
4. Gを実行
↓
5. Gがブロック(I/O待ちなど)したら
- MはPから切り離される
- 新しいMが作成されるか、アイドルMが使用される
↓
6. I/Oが完了したらGは再びキューに入る
Work Stealing
Pのローカルキューが空になったら、他のPのキューからWork Stealing(仕事の盗み)を行います。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func demonstrateWorkStealing() {
// CPUコア数を取得
numCPU := runtime.NumCPU()
fmt.Printf("CPU数: %d\n", numCPU)
// GOMAXPROCSを設定(Pの数)
runtime.GOMAXPROCS(numCPU)
var wg sync.WaitGroup
// 100個のgoroutineを作成
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// CPUバウンドな処理
sum := 0
for j := 0; j < 1000000; j++ {
sum += j
}
// 現在実行中のPとMを確認
fmt.Printf("Goroutine %d: sum=%d\n", id, sum)
}(i)
}
// 実行中のgoroutine数を監視
go func() {
for {
fmt.Printf("実行中のGoroutine数: %d\n", runtime.NumGoroutine())
time.Sleep(time.Millisecond * 100)
}
}()
wg.Wait()
}
Work Stealingの仕組み:
P0のキュー: [G1, G2, G3] ←── 実行中
P1のキュー: [G4] ←── キューが少ない
P2のキュー: [] ←── 空!
Work Stealing発生:
P2がP0のキューから半分盗む
P0のキュー: [G1]
P1のキュー: [G4]
P2のキュー: [G2, G3] ←── 盗んだ
時間計算量への影響
並行処理は理論上の最大スピードアップを提供しますが、実際の効果は様々な要因に依存します。
Amdahlの法則
スピードアップ = 1 / (S + P/N)
S: 逐次的部分の割合
P: 並列化可能部分の割合(S + P = 1)
N: プロセッサ数
例: 90%が並列化可能(S=0.1, P=0.9)
N=2: スピードアップ = 1 / (0.1 + 0.9/2) = 1.82倍
N=4: スピードアップ = 1 / (0.1 + 0.9/4) = 3.08倍
N=8: スピードアップ = 1 / (0.1 + 0.9/8) = 4.71倍
N=16: スピードアップ = 1 / (0.1 + 0.9/16) = 6.40倍
N=∞: スピードアップ = 1 / 0.1 = 10倍(理論上の上限)
逐次的部分(10%)がボトルネックとなり、無限にプロセッサを増やしても10倍以上にはなりません。
Gustafsonの法則
より楽観的なモデル。問題サイズを大きくすれば、並列化の効果が高まるという考え方。
スピードアップ = S + P×N
実践的な意味:
- 小さなタスク: Amdahlの法則が支配的
- 大きなタスク: Gustafsonの法則が適用可能
空間計算量への影響
Goroutineのメモリフットプリント
package main
import (
"fmt"
"runtime"
"sync"
)
func measureGoroutineMemory() {
var m runtime.MemStats
// 初期メモリ使用量
runtime.ReadMemStats(&m)
initialAlloc := m.Alloc
var wg sync.WaitGroup
numGoroutines := 100000
// 10万個のgoroutineを作成
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 何もせずに待機
var x [10]byte // 10バイトのローカル変数
_ = x
wg.Wait()
}()
}
// メモリ使用量を測定
runtime.ReadMemStats(&m)
currentAlloc := m.Alloc
fmt.Printf("Goroutine数: %d\n", numGoroutines)
fmt.Printf("追加メモリ使用量: %d MB\n", (currentAlloc-initialAlloc)/1024/1024)
fmt.Printf("Goroutine1つあたり: %d bytes\n", (currentAlloc-initialAlloc)/uint64(numGoroutines))
wg.Done() // 全goroutineを解放
}
典型的な結果:
Goroutine数: 100000
追加メモリ使用量: 195 MB
Goroutine1つあたり: 2048 bytes (2KB)
OSスレッドとの比較:
- OSスレッド: 1MB〜2MB(約1000倍)
- Goroutine: 2KB(初期)
---
設計原則と設計決定
CSPの思想: "Don't communicate by sharing memory; share memory by communicating"
この原則は、並行プログラミングのパラダイムシフトを表しています。
共有メモリ方式(従来)
// 悪い例:共有メモリで通信
type SharedData struct {
mu sync.Mutex
value int
}
func (s *SharedData) Increment() {
s.mu.Lock() // ロック取得
s.value++ // クリティカルセクション
s.mu.Unlock() // ロック解放
}
func (s *SharedData) Get() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.value
}
問題点:
- ロックの取得・解放が煩雑
- デッドロックのリスク
- スケーラビリティの問題(ロック競合)
通信方式(CSP)
// 良い例:チャネルで通信
type ChannelCounter struct {
requests chan int
results chan int
}
func NewChannelCounter() *ChannelCounter {
c := &ChannelCounter{
requests: make(chan int),
results: make(chan int),
}
go c.run() // バックグラウンドでカウンター管理
return c
}
func (c *ChannelCounter) run() {
counter := 0
for cmd := range c.requests {
switch cmd {
case 1: // Increment
counter++
case 2: // Get
c.results <- counter
}
}
}
func (c *ChannelCounter) Increment() {
c.requests <- 1
}
func (c *ChannelCounter) Get() int {
c.requests <- 2
return <-c.results
}
利点:
- ロック不要
- 状態管理が1箇所に集約
- デッドロックのリスクが低い
設計決定の例: Worker Poolパターン
決定事項1: ワーカー数の設定
// CPU集約的タスク
func cpuIntensiveWorkers() int {
return runtime.NumCPU() // CPUコア数と同じ
}
// I/O集約的タスク
func ioIntensiveWorkers() int {
return runtime.NumCPU() * 2 // CPUコア数の2〜4倍
}
// 混合タスク
func mixedWorkers() int {
return runtime.NumCPU() + 2 // CPUコア数 + 余裕
}
根拠:
- CPU集約的: コンテキストスイッチのオーバーヘッドを避ける
- I/O集約的: I/O待機中に他のワーカーを実行
- 混合: バランスを取る
決定事項2: チャネルのバッファサイズ
// バッファなし
ch1 := make(chan Task) // 同期的通信
// 小さなバッファ
ch2 := make(chan Task, 10) // 短期的なバースト対応
// 大きなバッファ
ch3 := make(chan Task, 1000) // 長期的なバッファリング
// ワーカー数の倍数
ch4 := make(chan Task, workerCount*2) // ワーカー数に比例
トレードオフ:
- バッファなし: 最も単純だが、送信側がブロックされやすい
- 小さなバッファ: バーストトラフィックに対応
- 大きなバッファ: メモリ使用量が増加
- ワーカー数の倍数: バランスが良い選択
決定事項3: Graceful Shutdownの実装
type WorkerPool struct {
shutdownOnce sync.Once // 複数回のシャットダウンを防ぐ
}
func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
var err error
wp.shutdownOnce.Do(func() {
// シャットダウン処理(1回のみ実行)
close(wp.jobs) // 新規ジョブを受け付けない
done := make(chan struct{})
go func() {
wp.wg.Wait() // 進行中のジョブの完了を待つ
close(done)
}()
select {
case <-done:
// 正常終了
err = nil
case <-time.After(timeout):
// タイムアウト - 強制終了
wp.cancel() // コンテキストキャンセル
<-done // それでも完了を待つ
err = fmt.Errorf("shutdown timeout")
}
})
return err
}
設計決定の理由:
sync.Once: 複数goroutineから同時にShutdownが呼ばれても安全- タイムアウト: 永遠に待たない(デッドロック回避)
- 2段階シャットダウン: graceful → タイムアウト後に強制終了
---
Mental Model: 直感的理解
Goroutineの軽量性
従来のスレッドを「重いトラック」、goroutineを「自転車」に例えると:
トラック(OSスレッド):
- 初期コスト: 高い(1-2MB)
- 切り替えコスト: 高い(数マイクロ秒)
- 数の制限: 数千程度
- 用途: 重い荷物を運ぶ
自転車(Goroutine):
- 初期コスト: 低い(2KB)
- 切り替えコスト: 低い(数十ナノ秒)
- 数の制限: 数百万可能
- 用途: 小回りが効く
並行 vs 並列
多くの人が混同する概念ですが、本質的に異なります。
並行 (Concurrency)
定義: 複数のタスクを交互に実行することで、同時進行しているように見せる
例: 1人のシェフが複数の料理を作る
時間軸:
0s : パスタの湯を沸かす ────┐
2s : サラダを切る ────┼────┐
4s : パスタを確認 ←───┘ │
5s : ステーキを焼く ─────────┼────┐
7s : サラダを盛り付ける ←────────┘ │
9s : パスタを茹でる ←──────────────┘
コード例:
func concurrentCooking() {
go boilPasta() // 並行タスク1
go cutSalad() // 並行タスク2
go cookSteak() // 並行タスク3
// 1つのCPUコアで交互に実行
}
並列 (Parallelism)
定義: 複数のタスクを同時に実行する
例: 3人のシェフが同時に料理を作る
時間軸:
シェフ1: パスタを茹でる ───────────────
シェフ2: サラダを切る ───────────────
シェフ3: ステーキを焼く ───────────────
↑ 同時に進行
コード例:
func parallelCooking() {
runtime.GOMAXPROCS(3) // 3つのCPUコアを使用
go boilPasta() // CPUコア1
go cutSalad() // CPUコア2
go cookSteak() // CPUコア3
// 3つのCPUコアで同時実行
}
視覚的な違い
並行(Concurrency):
CPU: [Task1] [Task2] [Task1] [Task3] [Task2] [Task1] ...
↑ 1つのCPUで高速に切り替え
並列(Parallelism):
CPU1: [Task1] [Task1] [Task1] [Task1] ...
CPU2: [Task2] [Task2] [Task2] [Task2] ...
CPU3: [Task3] [Task3] [Task3] [Task3] ...
↑ 複数のCPUで同時実行
データフローとしての並行処理
パイプラインを水の流れに例えると理解しやすくなります。
┌─────────┐ ┌─────────┐ ┌─────────┐
Data ──┤ Stage 1 ├──────→┤ Stage 2 ├──────→┤ Stage 3 ├──→ Result
└─────────┘ └─────────┘ └─────────┘
↑ フィルタ ↑ 変換 ↑ 集約
コード例:
func dataPipeline() {
// ステージ1: データ生成
generator := func() <-chan int {
out := make(chan int)
go func() {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}()
return out
}
// ステージ2: フィルタリング
filter := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
if num%2 == 0 { // 偶数のみ
out <- num
}
}
close(out)
}()
return out
}
// ステージ3: 変換
transform := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num // 2乗
}
close(out)
}()
return out
}
// パイプライン構築
data := generator()
filtered := filter(data)
transformed := transform(filtered)
// 結果を消費
for result := range transformed {
fmt.Println(result)
}
}
---
Further Exploration: 発展的トピック
1. golang.org/x/sync/errgroup
複数のgoroutineのエラーを集約する便利なパッケージ。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func demonstrateErrgroup() error {
g, ctx := errgroup.WithContext(context.Background())
// 複数のタスクを並行実行
urls := []string{
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
}
for _, url := range urls {
url := url // ローカル変数にコピー
g.Go(func() error {
return fetch(ctx, url)
})
}
// 全タスクの完了を待ち、最初のエラーを返す
if err := g.Wait(); err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
return nil
}
func fetch(ctx context.Context, url string) error {
// フェッチ処理(シミュレーション)
select {
case <-time.After(time.Second):
fmt.Printf("Fetched: %s\n", url)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
利点:
- エラーハンドリングが簡潔
- 最初のエラーで全タスクをキャンセル
- コンテキスト伝播が自動
2. golang.org/x/sync/semaphore
重み付きセマフォの実装。
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func demonstrateSemaphore() {
// 最大3並行
sem := semaphore.NewWeighted(3)
ctx := context.Background()
for i := 0; i < 10; i++ {
// セマフォを取得(重み1)
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("セマフォ取得失敗: %v\n", err)
break
}
go func(id int) {
defer sem.Release(1) // セマフォを解放
fmt.Printf("タスク %d 開始\n", id)
time.Sleep(time.Second)
fmt.Printf("タスク %d 完了\n", id)
}(i)
}
// 全タスクの完了を待つ
sem.Acquire(ctx, 3) // 全セマフォを取得(全タスク完了)
}
用途:
- リソース使用量の制限
- レート制限
- 接続プールの管理
3. golang.org/x/sync/singleflight
同じキーに対する重複リクエストを抑制。
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"time"
)
var group singleflight.Group
func expensiveOperation(key string) (interface{}, error) {
// 重い処理をシミュレート
time.Sleep(time.Second)
return fmt.Sprintf("result for %s", key), nil
}
func demonstrateSingleflight() {
var wg sync.WaitGroup
// 10個の同時リクエスト(同じキー)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 同じキーに対するリクエストは1回のみ実行される
result, err, shared := group.Do("mykey", func() (interface{}, error) {
return expensiveOperation("mykey")
})
fmt.Printf("Goroutine %d: result=%v, err=%v, shared=%v\n",
id, result, err, shared)
}(i)
}
wg.Wait()
}
出力例:
Goroutine 3: result=result for mykey, err=<nil>, shared=true
Goroutine 5: result=result for mykey, err=<nil>, shared=true
Goroutine 1: result=result for mykey, err=<nil>, shared=true
...
全てのgoroutineが1回の実行結果を共有します(shared=true)。
用途:
- キャッシュの stampeding herd 問題の解決
- API呼び出しの重複防止
- データベースクエリの最適化
4. 分散システムでの並行処理
分散トレーシング
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func distributedTask(ctx context.Context) {
tracer := otel.Tracer("my-service")
// スパンを作成
ctx, span := tracer.Start(ctx, "distributedTask")
defer span.End()
// 子タスクを並行実行
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
childTask(ctx, id) // コンテキストを伝播
}(i)
}
wg.Wait()
}
func childTask(ctx context.Context, id int) {
tracer := otel.Tracer("my-service")
// 親スパンから子スパンを作成
_, span := tracer.Start(ctx, fmt.Sprintf("childTask-%d", id))
defer span.End()
// 処理
time.Sleep(time.Millisecond * 100)
}
利点:
- 分散システム全体のリクエストフローを可視化
- パフォーマンスボトルネックの特定
- エラーの追跡
5. ランタイムの内部動作
スケジューラの調整
package main
import (
"fmt"
"runtime"
"runtime/debug"
)
func tuneRuntime() {
// GCの頻度を調整
debug.SetGCPercent(200) // デフォルトは100
// 最大OSスレッド数を制限
debug.SetMaxThreads(1000) // デフォルトは10000
// メモリ上限を設定
debug.SetMemoryLimit(1024 * 1024 * 1024) // 1GB
// 統計情報を取得
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MB\n", m.Alloc/1024/1024)
fmt.Printf("TotalAlloc = %v MB\n", m.TotalAlloc/1024/1024)
fmt.Printf("Sys = %v MB\n", m.Sys/1024/1024)
fmt.Printf("NumGC = %v\n", m.NumGC)
fmt.Printf("NumGoroutine = %v\n", runtime.NumGoroutine())
}
スケジューラトレース
# スケジューラの動作を詳細にログ出力
GODEBUG=schedtrace=1000 go run main.go
# 出力例:
# SCHED 0ms: gomaxprocs=8 idleprocs=0 threads=10 spinningthreads=0 idlethreads=3 runqueue=0 [0 0 0 0 0 0 0 0]
読み方:
gomaxprocs=8: Pの数(論理プロセッサ)idleprocs=0: アイドル状態のPthreads=10: OSスレッド数runqueue=0: グローバルランキューのG数[0 0 0 0 0 0 0 0]: 各Pのローカルキューのサイズ
---
ビジュアル図解
G-M-Pモデルの図解
┌─────────────────────────────────────────────────────┐
│ Go Runtime │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ P0 │ │ P1 │ │ P2 │ ... │
│ │ │ │ │ │ │ │
│ │ [G1, G2] │ │ [G3, G4] │ │ [G5, G6] │ Local │
│ │ Queue │ │ Queue │ │ Queue │ Run │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ Queues │
│ │ │ │ │
│ │ │ │ │
│ ┌────▼─────┐ ┌───▼──────┐ ┌───▼──────┐ │
│ │ M0 │ │ M1 │ │ M2 │ ... │
│ │ OS Thread│ │ OS Thread│ │ OS Thread│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────────┘
│ │ │
┌───────▼─────────────▼─────────────▼─────────────────┐
│ Operating System │
│ (Physical CPU Cores) │
└─────────────────────────────────────────────────────┘
Work Stealing:
P0 [G1, G2, G3, G4] ────┐
│ 盗む
P1 [] ←────┘ [G3, G4]
データフローの可視化
パイプラインパターン:
Input → [Generate] → ch1 → [Filter] → ch2 → [Transform] → ch3 → Output
↑ ↑ ↑ ↑
Goroutine Goroutine Goroutine Goroutine
1 2 3 4
ファンアウト・ファンイン:
┌─→ [Worker 1] ─┐
│ │
Input → [Dispatch] ─┼─→ [Worker 2] ─┼→ [Merge] → Output
│ │
└─→ [Worker 3] ─┘
チャネルの動作
バッファなしチャネル:
Sender Receiver
│ │
├──→ データ送信 ───────┤ ← ブロック(受信待ち)
│ (ブロック) │
│ ├──→ データ受信
├──→ 送信完了 ←────────┤
│ │
バッファ付きチャネル (cap=2):
Sender Buffer [_, _] Receiver
│ │
├──→ データ1送信 ──→ [1, _] │
│ (ブロックしない) │
│ │
├──→ データ2送信 ──→ [1, 2] │
│ (ブロックしない) │
│ │
├──→ データ3送信 ──→ [1, 2] │
│ (ブロック) ↑ 満杯 │
│ │
│ [1, 2] ←──────────────┤ 受信
│ [2, _] │
├──→ 送信完了 ──────→ [2, 3] │
---
Self-Check Questions
理解度を確認するための質問です。
基礎レベル
A: goroutineはOSスレッドより軽量(2KB vs 1-2MB)で、Goのランタイムによって管理されます。OSスレッドはカーネルによって管理され、コンテキストスイッチのコストが高いです。
sync.WaitGroupのAdd、Done、Waitはいつ使いますか? A:
- Add(n): goroutine起動前にカウンタを増やす
- Done(): goroutine終了時にカウンタを減らす(通常はdeferで)
- Wait(): 全goroutineの完了を待機
- Q: チャネルをクローズするのは送信側と受信側のどちらですか?
- Q:
sync.Mutexとsync.RWMutexの違いは何ですか?どちらを選ぶべきですか?
A: 送信側がクローズします。受信側がクローズすると、他の受信者がパニックする可能性があります。
中級レベル
A:
- Mutex: 読み書き両方をロック
- RWMutex: 読み取りは複数同時可能、書き込みは排他的
- 選択基準: 読み取りが多い場合はRWMutex、書き込みが多い場合はMutex(オーバーヘッドが少ない)
- Q: データ競合を検出する方法は?
- Q: バッファ付きチャネルとバッファなしチャネルの違いは?
A: go run -raceまたはgo test -raceフラグを使用します。これはGoのrace detectorを有効にします。
A: - バッファなし: 同期的通信(送受信が同時に発生) - バッファ付き: 非同期的通信(バッファが満杯になるまで送信側はブロックされない)
上級レベル
- Q: Goのスケジューラの Work Stealing とは何ですか?
- Q: Amdahlの法則が示す並行処理の限界とは?
- Q:
context.Contextの主な用途は?
A: あるP(論理プロセッサ)のローカルキューが空になったとき、他のPのキューから実行可能なgoroutineを「盗む」メカニズム。負荷分散を実現します。
A: プログラムの逐次的部分が並列化のボトルネックとなり、プロセッサ数を無限に増やしても、スピードアップは逐次的部分の逆数(1/S)が上限となる。
A: - キャンセルシグナルの伝播 - タイムアウトの設定 - リクエストスコープの値の伝達 - goroutine間の協調的な終了
- Q: どのような場合にgoroutineリークが発生しますか?
A:
- チャネルからの受信を永遠に待ち続ける
- selectでdefaultやタイムアウトがない
- context.Done()を監視していない
- 無限ループから抜け出せない
---
Production Considerations
本番環境で並行処理を使用する際の重要な考慮事項です。
1. リソース制限
Goroutineの数を制限する
type ResourceLimiter struct {
sem chan struct{} // セマフォ
}
func NewResourceLimiter(max int) *ResourceLimiter {
return &ResourceLimiter{
sem: make(chan struct{}, max),
}
}
func (rl *ResourceLimiter) Acquire() {
rl.sem <- struct{}{}
}
func (rl *ResourceLimiter) Release() {
<-rl.sem
}
// 使用例
func processWithLimit(items []Item) {
limiter := NewResourceLimiter(100) // 最大100並行
var wg sync.WaitGroup
for _, item := range items {
limiter.Acquire() // セマフォ取得
wg.Add(1)
go func(i Item) {
defer wg.Done()
defer limiter.Release() // セマフォ解放
process(i)
}(item)
}
wg.Wait()
}
理由: 無制限にgoroutineを作成すると、メモリ枯渇やCPU競合が発生します。
2. タイムアウトの設定
func withTimeout(f func() error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
errCh := make(chan error, 1)
go func() {
errCh <- f()
}()
select {
case err := <-errCh:
return err
case <-ctx.Done():
return fmt.Errorf("operation timeout: %w", ctx.Err())
}
}
理由: タイムアウトがないと、障害時に永遠に待ち続ける可能性があります。
3. パニックからの回復
func safeGoroutine(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
// スタックトレースをログに記録
buf := make([]byte, 1<<16)
stackSize := runtime.Stack(buf, false)
log.Printf("PANIC: %v\nStack trace:\n%s", r, buf[:stackSize])
}
}()
f() // 実際の処理
}()
}
理由: 1つのgoroutineのパニックがシステム全体をダウンさせる可能性があります。
4. メトリクスと監視
type Metrics struct {
activeGoroutines int64
totalProcessed int64
totalErrors int64
processingTimeMs int64
}
var globalMetrics Metrics
func recordMetrics() {
atomic.AddInt64(&globalMetrics.activeGoroutines, 1)
defer atomic.AddInt64(&globalMetrics.activeGoroutines, -1)
start := time.Now()
defer func() {
duration := time.Since(start).Milliseconds()
atomic.AddInt64(&globalMetrics.processingTimeMs, duration)
}()
// 処理
if err := doSomething(); err != nil {
atomic.AddInt64(&globalMetrics.totalErrors, 1)
} else {
atomic.AddInt64(&globalMetrics.totalProcessed, 1)
}
}
// Prometheusなどに定期的にエクスポート
func exportMetrics() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
active := atomic.LoadInt64(&globalMetrics.activeGoroutines)
processed := atomic.LoadInt64(&globalMetrics.totalProcessed)
errors := atomic.LoadInt64(&globalMetrics.totalErrors)
avgTime := atomic.LoadInt64(&globalMetrics.processingTimeMs)
log.Printf("Metrics: active=%d, processed=%d, errors=%d, avg_time=%dms",
active, processed, errors, avgTime)
}
}
理由: 本番環境では、システムの健全性を監視する必要があります。
5. Graceful Shutdownのベストプラクティス
type Server struct {
shutdownCh chan struct{}
wg sync.WaitGroup
}
func (s *Server) Start() {
// ワーカーを起動
for i := 0; i < 10; i++ {
s.wg.Add(1)
go s.worker(i)
}
}
func (s *Server) worker(id int) {
defer s.wg.Done()
for {
select {
case <-s.shutdownCh:
log.Printf("Worker %d: シャットダウンシグナル受信\n", id)
return
default:
// 通常の処理
s.doWork(id)
}
}
}
func (s *Server) Shutdown(timeout time.Duration) error {
log.Println("シャットダウン開始...")
// 1. 新規リクエストの受付を停止
close(s.shutdownCh)
// 2. 進行中の処理の完了を待つ(タイムアウト付き)
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("全ワーカーが正常に終了しました")
return nil
case <-time.After(timeout):
log.Println("シャットダウンタイムアウト")
return fmt.Errorf("shutdown timeout after %v", timeout)
}
}
// OSシグナルとの統合
func main() {
server := &Server{
shutdownCh: make(chan struct{}),
}
server.Start()
// OSシグナルを待機
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("シャットダウンシグナル受信")
if err := server.Shutdown(30 * time.Second); err != nil {
log.Printf("シャットダウンエラー: %v\n", err)
os.Exit(1)
}
log.Println("正常終了")
}
6. デバッグとプロファイリング
import _ "net/http/pprof"
func enableProfiling() {
go func() {
// pprofエンドポイントを公開
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}
// 使用方法:
// go tool pprof http://localhost:6060/debug/pprof/goroutine
// go tool pprof http://localhost:6060/debug/pprof/heap
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
重要なプロファイル:
goroutine: goroutineリークの検出heap: メモリリークの検出cpu: CPUボトルネックの特定mutex: ロック競合の分析- CSPの思想を理解する: 共有メモリではなく通信で協調
- G-M-Pモデルを知る: Goのスケジューラの仕組みを理解
- 並行と並列の違い: 概念を正しく理解する
- 適切な同期プリミティブ: Mutex、RWMutex、atomic、channelを使い分ける
- リソース管理: goroutineリークを防ぎ、適切に制限する
- エラーハンドリング: パニック回復、タイムアウト、キャンセル
- 監視と最適化: メトリクス収集、プロファイリング
- Day 5: チャネルを使った高度な並行処理パターン
- 実践: 実際のプロジェクトで並行処理を適用
- 発展学習: 分散システム、マイクロサービスでの並行処理
---
まとめ
並行処理は強力ですが、適切に使用する必要があります:
重要な原則
次のステップ
並行処理をマスターすることで、高性能で拡張性の高いGoアプリケーションを構築できるようになります。