Day 4: 並行処理の基礎 - 解答例
目次
- 並行処理アプローチの比較
- チャレンジ1: 並行ダウンローダー
- チャレンジ2: TTLキャッシュ
- チャレンジ3: ワーカープール
- Optimization Path: 進化の過程
- Common Mistakes: よくある間違い
- パフォーマンス分析
- プロダクションレディコード
---
並行処理アプローチの比較
並行処理を実装する際には、複数のアプローチがあります。それぞれのトレードオフを理解することが重要です。
アプローチ1: goroutineとsync.WaitGroup
長所:
- シンプルで理解しやすい
- 固定数のタスクに最適
- オーバーヘッドが小さい
短所:
- エラーハンドリングが難しい
- 動的なタスク追加に対応しにくい
- タイムアウトの実装が煩雑
ベストユースケース: 既知の固定数のタスクを並行実行する場合
アプローチ2: channelベース
長所:
- エラーハンドリングが自然
- 動的なタスク追加が容易
- ストリーミング処理に最適
短所:
- channelのバッファリングを考慮する必要がある
- デッドロックのリスク
- やや複雑
ベストユースケース: プロデューサー・コンシューマーパターン、ストリーミング処理
アプローチ3: worker poolパターン
長所:
- リソース使用量を制御できる
- スケーラビリティが高い
- 本番環境に最適
短所:
- 実装が複雑
- オーバーヘッドが大きい
ベストユースケース: 大量のタスク処理、リソース制限が必要な場合
---
チャレンジ1: 並行ダウンローダー
解法1: 基本的なgoroutineとWaitGroup
package main
import (
"fmt"
"sync"
"time"
)
// Result はダウンロード結果を表す構造体
type Result struct {
URL string // ダウンロード対象のURL
Size int // ダウンロードしたバイト数
Error error // エラーがあればここに格納
}
// download は指定されたURLをダウンロードする(シミュレーション)
// url: ダウンロード対象のURL
// results: 結果を送信するチャネル
// wg: goroutineの完了を通知するWaitGroup
func download(url string, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done() // goroutine終了時にWaitGroupのカウンタをデクリメント
// ダウンロード処理をシミュレート
time.Sleep(time.Millisecond * 100) // 実際のネットワーク遅延を模倣
// 結果をチャネルに送信
// ここではURLの長さを基にサイズを計算(実際はHTTP Content-Lengthを使用)
results <- Result{
URL: url,
Size: len(url) * 100, // 仮のサイズ計算
Error: nil,
}
}
func main() {
// ダウンロード対象のURL一覧
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
"https://example.com/file4.zip",
"https://example.com/file5.zip",
}
// バッファ付きチャネルを作成(全URL数分のバッファ)
// バッファを持つことで、送信側がブロックされずに処理を続行できる
results := make(chan Result, len(urls))
// WaitGroupを初期化
var wg sync.WaitGroup
// 各URLに対してgoroutineを起動
for _, url := range urls {
wg.Add(1) // WaitGroupのカウンタをインクリメント
go download(url, results, &wg) // 非同期にダウンロードを実行
}
// 全てのgoroutineが完了するまで待機
wg.Wait()
close(results) // 全ての結果が送信されたのでチャネルをクローズ
// 結果を集計
var totalSize int
for result := range results {
if result.Error != nil {
fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("完了 %s: %d bytes\n", result.URL, result.Size)
totalSize += result.Size
}
}
fmt.Printf("\n合計ダウンロード: %d bytes\n", totalSize)
}
解法2: channelベースアプローチ
package main
import (
"context"
"fmt"
"time"
)
// DownloadTask はダウンロードタスクを表す
type DownloadTask struct {
URL string
}
// DownloadResult はダウンロード結果を表す
type DownloadResult struct {
URL string
Size int
Duration time.Duration
Error error
}
// downloadWorker はチャネルからタスクを受け取り、ダウンロードを実行する
// ctx: キャンセル制御用のコンテキスト
// tasks: 受信専用のタスクチャネル
// results: 送信専用の結果チャネル
func downloadWorker(ctx context.Context, tasks <-chan DownloadTask, results chan<- DownloadResult) {
for task := range tasks { // tasksチャネルがクローズされるまでループ
select {
case <-ctx.Done(): // コンテキストがキャンセルされた場合
return
default:
// ダウンロード処理を実行
start := time.Now()
// 実際のダウンロード処理(ここではシミュレーション)
time.Sleep(time.Millisecond * 200)
// 結果を送信
results <- DownloadResult{
URL: task.URL,
Size: len(task.URL) * 100,
Duration: time.Since(start),
Error: nil,
}
}
}
}
func channelBasedDownload(urls []string, numWorkers int) []DownloadResult {
// タイムアウト付きコンテキストを作成
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() // 関数終了時にリソースをクリーンアップ
// タスクと結果のチャネルを作成
tasks := make(chan DownloadTask, len(urls)) // タスク用チャネル
results := make(chan DownloadResult, len(urls)) // 結果用チャネル
// ワーカーgoroutineを起動
for i := 0; i < numWorkers; i++ {
go downloadWorker(ctx, tasks, results) // 複数のワーカーを並行実行
}
// タスクをチャネルに送信
for _, url := range urls {
tasks <- DownloadTask{URL: url}
}
close(tasks) // 全タスク送信完了
// 結果を収集
downloadResults := make([]DownloadResult, 0, len(urls))
for i := 0; i < len(urls); i++ {
select {
case result := <-results:
downloadResults = append(downloadResults, result)
case <-ctx.Done(): // タイムアウト発生
fmt.Println("タイムアウトしました")
return downloadResults
}
}
return downloadResults
}
func main() {
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
"https://example.com/file4.zip",
"https://example.com/file5.zip",
}
// 3つのワーカーで並行ダウンロード
results := channelBasedDownload(urls, 3)
// 結果を表示
for _, result := range results {
if result.Error != nil {
fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("完了 %s: %d bytes (所要時間: %v)\n",
result.URL, result.Size, result.Duration)
}
}
}
解法3: worker poolパターン
この解法は最も堅牢で、本番環境に適しています。
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Job はワーカープールで処理するジョブのインターフェース
type Job interface {
Execute(ctx context.Context) (interface{}, error)
}
// DownloadJob はダウンロードジョブの実装
type DownloadJob struct {
URL string
}
// Execute はジョブを実行する
func (j *DownloadJob) Execute(ctx context.Context) (interface{}, error) {
start := time.Now()
// ダウンロード処理(シミュレーション)
select {
case <-time.After(time.Millisecond * 150):
// 成功
return DownloadResult{
URL: j.URL,
Size: len(j.URL) * 100,
Duration: time.Since(start),
}, nil
case <-ctx.Done():
// キャンセル
return nil, ctx.Err()
}
}
// WorkerPool はワーカープールの実装
type WorkerPool struct {
workers int // ワーカー数
jobs chan Job // ジョブキュー
results chan interface{} // 結果チャネル
errors chan error // エラーチャネル
wg sync.WaitGroup // 完了待機用
ctx context.Context // キャンセル制御用
cancelFunc context.CancelFunc // キャンセル関数
}
// NewWorkerPool は新しいワーカープールを作成
func NewWorkerPool(workers int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
workers: workers,
jobs: make(chan Job, workers*2), // バッファサイズはワーカー数の2倍
results: make(chan interface{}, workers*2),
errors: make(chan error, workers*2),
ctx: ctx,
cancelFunc: cancel,
}
}
// Start はワーカープールを開始
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i) // 各ワーカーをgoroutineとして起動
}
}
// worker は個々のワーカーの処理ループ
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done() // ワーカー終了時にWaitGroupをデクリメント
fmt.Printf("Worker %d: 起動しました\n", id)
for {
select {
case job, ok := <-wp.jobs:
if !ok {
// ジョブチャネルがクローズされた
fmt.Printf("Worker %d: 終了します\n", id)
return
}
fmt.Printf("Worker %d: ジョブを処理中\n", id)
// ジョブを実行
result, err := job.Execute(wp.ctx)
if err != nil {
// エラーが発生した場合
select {
case wp.errors <- err:
case <-wp.ctx.Done():
return
}
} else {
// 成功した場合
select {
case wp.results <- result:
case <-wp.ctx.Done():
return
}
}
case <-wp.ctx.Done():
// コンテキストがキャンセルされた
fmt.Printf("Worker %d: キャンセルされました\n", id)
return
}
}
}
// Submit はジョブをワーカープールに投入
func (wp *WorkerPool) Submit(job Job) error {
select {
case wp.jobs <- job:
return nil
case <-wp.ctx.Done():
return fmt.Errorf("worker pool is closed")
}
}
// Results は結果チャネルを返す
func (wp *WorkerPool) Results() <-chan interface{} {
return wp.results
}
// Errors はエラーチャネルを返す
func (wp *WorkerPool) Errors() <-chan error {
return wp.errors
}
// Shutdown はワーカープールを安全にシャットダウン
func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
// ジョブチャネルをクローズ(新規ジョブを受け付けない)
close(wp.jobs)
// タイムアウト付きで全ワーカーの完了を待機
done := make(chan struct{})
go func() {
wp.wg.Wait() // 全ワーカーの完了を待機
close(wp.results) // 結果チャネルをクローズ
close(wp.errors) // エラーチャネルをクローズ
close(done)
}()
select {
case <-done:
// 正常にシャットダウン完了
fmt.Println("正常にシャットダウンしました")
return nil
case <-time.After(timeout):
// タイムアウト発生 - 強制終了
wp.cancelFunc() // コンテキストをキャンセルして全ワーカーを停止
fmt.Println("タイムアウト - 強制終了しました")
return fmt.Errorf("shutdown timeout")
}
}
func main() {
// 5つのワーカーを持つプールを作成
pool := NewWorkerPool(5)
pool.Start()
// ダウンロードジョブを投入
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
"https://example.com/file4.zip",
"https://example.com/file5.zip",
"https://example.com/file6.zip",
"https://example.com/file7.zip",
"https://example.com/file8.zip",
"https://example.com/file9.zip",
"https://example.com/file10.zip",
}
// ジョブを非同期に投入
go func() {
for _, url := range urls {
if err := pool.Submit(&DownloadJob{URL: url}); err != nil {
fmt.Printf("ジョブ投入エラー: %v\n", err)
return
}
}
}()
// 結果とエラーを収集
var completedCount int
var errorCount int
// 別goroutineでエラーを監視
go func() {
for err := range pool.Errors() {
fmt.Printf("エラー: %v\n", err)
errorCount++
}
}()
// 結果を収集
for result := range pool.Results() {
if dr, ok := result.(DownloadResult); ok {
fmt.Printf("完了: %s (%d bytes, %v)\n", dr.URL, dr.Size, dr.Duration)
completedCount++
// 全ジョブが完了したらシャットダウン
if completedCount == len(urls) {
pool.Shutdown(5 * time.Second)
break
}
}
}
fmt.Printf("\n完了: %d件, エラー: %d件\n", completedCount, errorCount)
}
チャレンジ2: TTLキャッシュ
基本実装
シンプルな実装から始めて、段階的に機能を追加していきます。
package main
import (
"sync"
"time"
)
// CacheItem はキャッシュアイテムを表す
type CacheItem struct {
Value string // 保存する値
ExpiresAt time.Time // 有効期限
}
// TTLCache はTTL(Time To Live)付きキャッシュ
type TTLCache struct {
mu sync.RWMutex // 読み書きロック
items map[string]CacheItem // アイテムを格納するマップ
}
// NewTTLCache は新しいTTLキャッシュを作成
func NewTTLCache() *TTLCache {
return &TTLCache{
items: make(map[string]CacheItem), // マップを初期化
}
}
// Get はキーに対応する値を取得
func (c *TTLCache) Get(key string) (string, bool) {
c.mu.RLock() // 読み取りロックを取得
defer c.mu.RUnlock() // 関数終了時にアンロック
item, ok := c.items[key] // マップから取得
if !ok {
return "", false // キーが存在しない
}
// 有効期限をチェック
if time.Now().After(item.ExpiresAt) {
return "", false // 期限切れ
}
return item.Value, true // 値を返す
}
// Set はキーと値を設定(TTL付き)
func (c *TTLCache) Set(key, value string, ttl time.Duration) {
c.mu.Lock() // 書き込みロックを取得
defer c.mu.Unlock() // 関数終了時にアンロック
c.items[key] = CacheItem{
Value: value,
ExpiresAt: time.Now().Add(ttl), // 有効期限を設定
}
}
// Delete はキーを削除
func (c *TTLCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}
プロダクショングレードの完全実装
package main
import (
"context"
"fmt"
"sync"
"time"
)
// CacheEntry はキャッシュのエントリ
type CacheEntry struct {
Value interface{}
ExpiresAt time.Time
AccessedAt time.Time
}
// ProductionTTLCache はプロダクションレベルのTTLキャッシュ
type ProductionTTLCache struct {
mu sync.RWMutex
items map[string]*CacheEntry
maxSize int
defaultTTL time.Duration
cleanupInterval time.Duration
stats CacheStats
stopCh chan struct{}
wg sync.WaitGroup
}
// CacheStats はキャッシュの統計情報
type CacheStats struct {
mu sync.RWMutex
hits int64
misses int64
evictions int64
expirations int64
}
// NewProductionTTLCache は新しいキャッシュを作成
func NewProductionTTLCache(maxSize int, defaultTTL, cleanupInterval time.Duration) *ProductionTTLCache {
cache := &ProductionTTLCache{
items: make(map[string]*CacheEntry),
maxSize: maxSize,
defaultTTL: defaultTTL,
cleanupInterval: cleanupInterval,
stopCh: make(chan struct{}),
}
// バックグラウンドクリーンアップを開始
cache.wg.Add(1)
go cache.cleanupLoop()
return cache
}
// Get はキーに対応する値を取得
func (c *ProductionTTLCache) Get(key string) (interface{}, bool) {
c.mu.RLock()
entry, exists := c.items[key]
c.mu.RUnlock()
if !exists {
c.stats.recordMiss()
return nil, false
}
if time.Now().After(entry.ExpiresAt) {
c.stats.recordMiss()
c.Delete(key)
return nil, false
}
// アクセス時刻を更新(LRU用)
c.mu.Lock()
entry.AccessedAt = time.Now()
c.mu.Unlock()
c.stats.recordHit()
return entry.Value, true
}
// Set はキーと値を設定
func (c *ProductionTTLCache) Set(key string, value interface{}) {
c.SetWithTTL(key, value, c.defaultTTL)
}
// SetWithTTL はカスタムTTLでキーと値を設定
func (c *ProductionTTLCache) SetWithTTL(key string, value interface{}, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
// サイズ制限チェック
if len(c.items) >= c.maxSize {
c.evictLRU()
}
c.items[key] = &CacheEntry{
Value: value,
ExpiresAt: time.Now().Add(ttl),
AccessedAt: time.Now(),
}
}
// Delete はキーを削除
func (c *ProductionTTLCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}
// Clear は全てのエントリを削除
func (c *ProductionTTLCache) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
c.items = make(map[string]*CacheEntry)
}
// Size は現在のエントリ数を返す
func (c *ProductionTTLCache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.items)
}
// evictLRU はLRU(最も古いアクセス)のエントリを削除
func (c *ProductionTTLCache) evictLRU() {
var oldestKey string
var oldestTime time.Time
for key, entry := range c.items {
if oldestKey == "" || entry.AccessedAt.Before(oldestTime) {
oldestKey = key
oldestTime = entry.AccessedAt
}
}
if oldestKey != "" {
delete(c.items, oldestKey)
c.stats.recordEviction()
}
}
// cleanupLoop は期限切れエントリを定期的にクリーンアップ
func (c *ProductionTTLCache) cleanupLoop() {
defer c.wg.Done()
ticker := time.NewTicker(c.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.cleanup()
case <-c.stopCh:
return
}
}
}
// cleanup は期限切れエントリを削除
func (c *ProductionTTLCache) cleanup() {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
for key, entry := range c.items {
if now.After(entry.ExpiresAt) {
delete(c.items, key)
c.stats.recordExpiration()
}
}
}
// Close はキャッシュをクローズ
func (c *ProductionTTLCache) Close() {
close(c.stopCh)
c.wg.Wait()
}
// GetStats は統計情報を取得
func (c *ProductionTTLCache) GetStats() (hits, misses, evictions, expirations int64) {
return c.stats.get()
}
// CacheStats のメソッド
func (s *CacheStats) recordHit() {
s.mu.Lock()
defer s.mu.Unlock()
s.hits++
}
func (s *CacheStats) recordMiss() {
s.mu.Lock()
defer s.mu.Unlock()
s.misses++
}
func (s *CacheStats) recordEviction() {
s.mu.Lock()
defer s.mu.Unlock()
s.evictions++
}
func (s *CacheStats) recordExpiration() {
s.mu.Lock()
defer s.mu.Unlock()
s.expirations++
}
func (s *CacheStats) get() (hits, misses, evictions, expirations int64) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.hits, s.misses, s.evictions, s.expirations
}
// 使用例
func demonstrateTTLCache() {
cache := NewProductionTTLCache(
100, // 最大100エントリ
5*time.Second, // デフォルトTTL
1*time.Second, // クリーンアップ間隔
)
defer cache.Close()
// データを設定
cache.Set("user:1", map[string]string{"name": "太郎"})
cache.SetWithTTL("session:abc", "active", 10*time.Second)
// データを取得
if value, ok := cache.Get("user:1"); ok {
fmt.Printf("取得成功: %v\n", value)
}
// 統計情報を表示
time.Sleep(2 * time.Second)
hits, misses, evictions, expirations := cache.GetStats()
fmt.Printf("ヒット: %d, ミス: %d, 削除: %d, 期限切れ: %d\n",
hits, misses, evictions, expirations)
}
---
Optimization Path: 進化の過程
並行処理の実装を段階的に改善していく過程を示します。
ステージ1: 素朴な実装
// 問題点:エラーハンドリングなし、タイムアウトなし、リソース制限なし
func naiveParallelProcess(items []string) {
for _, item := range items {
go func(i string) {
// 処理
process(i)
}(item)
}
// 完了を待たずに終了してしまう!
}
問題点:
- goroutineの完了を待たない
- エラーを捕捉できない
- 無制限にgoroutineを生成
ステージ2: WaitGroupでの完了待機
// 改善点:完了を正しく待機
func betterParallelProcess(items []string) {
var wg sync.WaitGroup // 完了を追跡
for _, item := range items {
wg.Add(1) // カウンタをインクリメント
go func(i string) {
defer wg.Done() // 必ず実行される
process(i)
}(item)
}
wg.Wait() // 全goroutineの完了を待機
}
改善点:
- 全goroutineの完了を正しく待機
defer wg.Done()でパニック時も確実にカウントダウン
残る問題点:
- エラーハンドリングがない
- タイムアウトがない
ステージ3: エラーハンドリング追加
func withErrorHandling(items []string) []error {
var (
wg sync.WaitGroup
mu sync.Mutex // エラースライス保護用
errors []error
)
for _, item := range items {
wg.Add(1)
go func(i string) {
defer wg.Done()
if err := process(i); err != nil {
mu.Lock() // ロックを取得
errors = append(errors, err) // スレッドセーフに追加
mu.Unlock() // ロックを解放
}
}(item)
}
wg.Wait()
return errors // 全エラーを返す
}
改善点:
- エラーを収集して返す
- Mutexで競合状態を防ぐ
ステージ4: タイムアウト追加
func withTimeout(items []string, timeout time.Duration) ([]error, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() // リソースを必ずクリーンアップ
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, item := range items {
wg.Add(1)
go func(i string) {
defer wg.Done()
// コンテキストをprocessに渡す
if err := processWithContext(ctx, i); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(item)
}
// 完了またはタイムアウトを待機
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return errors, nil // 正常完了
case <-ctx.Done():
return errors, fmt.Errorf("timeout exceeded") // タイムアウト
}
}
改善点:
- コンテキストでタイムアウトを制御
- 処理が長引いてもタイムアウトで中断
ステージ5: Graceful Shutdown(本番品質)
type GracefulProcessor struct {
maxWorkers int
timeout time.Duration
sem chan struct{} // セマフォでワーカー数制限
}
func NewGracefulProcessor(maxWorkers int, timeout time.Duration) *GracefulProcessor {
return &GracefulProcessor{
maxWorkers: maxWorkers,
timeout: timeout,
sem: make(chan struct{}, maxWorkers), // バッファ付きチャネル
}
}
func (gp *GracefulProcessor) Process(items []string) ([]error, error) {
ctx, cancel := context.WithTimeout(context.Background(), gp.timeout)
defer cancel()
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, item := range items {
// セマフォを取得(ワーカー数を制限)
select {
case gp.sem <- struct{}{}:
// セマフォ取得成功
case <-ctx.Done():
// タイムアウト
return errors, ctx.Err()
}
wg.Add(1)
go func(i string) {
defer func() {
<-gp.sem // セマフォを解放
wg.Done() // WaitGroupをデクリメント
}()
// パニックからの回復
defer func() {
if r := recover(); r != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("panic: %v", r))
mu.Unlock()
}
}()
if err := processWithContext(ctx, i); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(item)
}
// Graceful shutdown
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return errors, nil
case <-ctx.Done():
// タイムアウト発生 - 進行中の処理の完了を待つ
<-done
return errors, fmt.Errorf("timeout: some tasks may be incomplete")
}
}
最終改善点:
- セマフォでワーカー数を制限(リソース保護)
- パニックからの回復
- Graceful shutdown(進行中の処理は完了させる)
- 本番環境で使用可能な品質
---
Common Mistakes: よくある間違い
間違い1: goroutineリーク
// 悪い例:goroutineがリークする
func badExample() {
ch := make(chan int) // バッファなし
go func() {
value := <-ch // 永遠に待ち続ける
fmt.Println(value)
}()
// chに送信せずに関数が終了
// goroutineは永遠に残る(リーク)
}
// 良い例:タイムアウトを設定
func goodExample() {
ch := make(chan int)
go func() {
select {
case value := <-ch:
fmt.Println(value)
case <-time.After(time.Second):
fmt.Println("タイムアウト")
return // goroutineが終了する
}
}()
time.Sleep(2 * time.Second)
}
間違い2: データ競合
// 悪い例:データ競合が発生
func badRace() {
counter := 0 // 共有変数
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 競合!複数goroutineが同時にアクセス
}()
}
wg.Wait()
fmt.Println(counter) // 1000にならない可能性が高い
}
// 良い例1:Mutexで保護
func goodMutex() {
counter := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // ロック取得
counter++ // クリティカルセクション
mu.Unlock() // ロック解放
}()
}
wg.Wait()
fmt.Println(counter) // 必ず1000
}
// 良い例2:atomic操作(最速)
func goodAtomic() {
var counter int64 // int64型
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // アトミックに加算
}()
}
wg.Wait()
fmt.Println(atomic.LoadInt64(&counter)) // 必ず1000
}
間違い3: デッドロック
// 悪い例:デッドロックが発生
func badDeadlock() {
var mu1, mu2 sync.Mutex
// goroutine 1
go func() {
mu1.Lock()
time.Sleep(time.Millisecond)
mu2.Lock() // goroutine 2がmu2を保持している
// デッドロック!
mu2.Unlock()
mu1.Unlock()
}()
// goroutine 2
go func() {
mu2.Lock()
time.Sleep(time.Millisecond)
mu1.Lock() // goroutine 1がmu1を保持している
// デッドロック!
mu1.Unlock()
mu2.Unlock()
}()
time.Sleep(time.Second)
}
// 良い例:ロックの順序を統一
func goodNoDeadlock() {
var mu1, mu2 sync.Mutex
// 常にmu1 → mu2の順でロック
lockBoth := func() {
mu1.Lock()
mu2.Lock()
}
unlockBoth := func() {
mu2.Unlock() // 逆順で解放
mu1.Unlock()
}
// goroutine 1
go func() {
lockBoth() // 同じ順序
// 処理
unlockBoth()
}()
// goroutine 2
go func() {
lockBoth() // 同じ順序
// 処理
unlockBoth()
}()
}
間違い4: バッファなしチャネルでのデッドロック
// 悪い例:送信側がブロック
func badChannelDeadlock() {
ch := make(chan int) // バッファなし
ch <- 42 // 受信側がいないのでブロック!
// デッドロック
value := <-ch
fmt.Println(value)
}
// 良い例1:goroutineで送信
func goodChannelGoroutine() {
ch := make(chan int)
go func() {
ch <- 42 // 別goroutineで送信
}()
value := <-ch // メインgoroutineで受信
fmt.Println(value)
}
// 良い例2:バッファ付きチャネル
func goodChannelBuffered() {
ch := make(chan int, 1) // バッファサイズ1
ch <- 42 // バッファに格納(ブロックしない)
value := <-ch
fmt.Println(value)
}
間違い5: ループ変数のキャプチャ
// 悪い例:全てのgoroutineが最後の値を使用
func badLoopCapture() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i) // 全て「5」を出力する可能性が高い
}()
}
wg.Wait()
}
// 良い例1:引数で渡す
func goodLoopParam() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) { // パラメータとして受け取る
defer wg.Done()
fmt.Println(n) // 正しい値が出力される
}(i) // 引数として渡す
}
wg.Wait()
}
// 良い例2:ローカル変数にコピー
func goodLoopCopy() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
i := i // シャドーイング(ローカル変数にコピー)
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i) // 正しい値
}()
}
wg.Wait()
}
---
パフォーマンス分析
ベンチマーク比較
package main
import (
"sync"
"sync/atomic"
"testing"
)
// ベンチマーク1: Mutex
func BenchmarkMutex(b *testing.B) {
var counter int
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
counter++
mu.Unlock()
}
})
}
// ベンチマーク2: RWMutex(読み取り)
func BenchmarkRWMutexRead(b *testing.B) {
var counter int
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.RLock()
_ = counter
mu.RUnlock()
}
})
}
// ベンチマーク3: RWMutex(書き込み)
func BenchmarkRWMutexWrite(b *testing.B) {
var counter int
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
counter++
mu.Unlock()
}
})
}
// ベンチマーク4: Atomic
func BenchmarkAtomic(b *testing.B) {
var counter int64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomic.AddInt64(&counter, 1)
}
})
}
// ベンチマーク5: Channel
func BenchmarkChannel(b *testing.B) {
ch := make(chan int, 1)
ch <- 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
val := <-ch
val++
ch <- val
}
})
}
実行例:
go test -bench=. -benchmem -cpu=1,2,4,8
典型的な結果 (Apple M1, 8 cores):
BenchmarkMutex-8 47,234,156 25.4 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexRead-8 89,445,677 13.1 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexWrite-8 45,123,789 26.7 ns/op 0 B/op 0 allocs/op
BenchmarkAtomic-8 198,567,234 6.2 ns/op 0 B/op 0 allocs/op
BenchmarkChannel-8 10,234,567 156.0 ns/op 0 B/op 0 allocs/op
分析:
- Atomic: 最速(6.2 ns/op)- 単純なカウンターに最適
- RWMutex(読み取り): 読み取り専用なら高速(13.1 ns/op)
- Mutex/RWMutex(書き込み): 中程度(25-27 ns/op)
- Channel: 最遅(156 ns/op)- 通信には最適だがオーバーヘッドが大きい
並行度とパフォーマンスの関係
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func benchmarkWorkers(taskCount, workerCount int) time.Duration {
tasks := make(chan int, taskCount)
var wg sync.WaitGroup
// タスクを生成
for i := 0; i < taskCount; i++ {
tasks <- i
}
close(tasks)
start := time.Now()
// ワーカーを起動
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
// CPU集約的な処理をシミュレート
_ = fibonacci(20)
_ = task
}
}()
}
wg.Wait()
return time.Since(start)
}
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) // CPUコア数を使用
taskCount := 1000
workerCounts := []int{1, 2, 4, 8, 16, 32, 64, 128}
fmt.Println("並行度とパフォーマンスの関係")
fmt.Println("タスク数:", taskCount)
fmt.Println("CPUコア数:", runtime.NumCPU())
fmt.Println()
for _, workers := range workerCounts {
duration := benchmarkWorkers(taskCount, workers)
fmt.Printf("ワーカー数: %3d -> 所要時間: %v\n", workers, duration)
}
}
典型的な出力 (8コアCPU):
並行度とパフォーマンスの関係
タスク数: 1000
CPUコア数: 8
ワーカー数: 1 -> 所要時間: 15.234s
ワーカー数: 2 -> 所要時間: 7.891s
ワーカー数: 4 -> 所要時間: 4.123s
ワーカー数: 8 -> 所要時間: 2.045s ← 最適(CPUコア数と一致)
ワーカー数: 16 -> 所要時間: 2.134s
ワーカー数: 32 -> 所要時間: 2.345s
ワーカー数: 64 -> 所要時間: 2.567s
ワーカー数: 128 -> 所要時間: 2.789s
結論:
- CPU集約的タスク:ワーカー数 = CPUコア数が最適
- I/O集約的タスク:より多くのワーカーが有効(待機時間が多いため)
---
プロダクションレディコード
本番環境で使用できる、完全な並行処理システムの実装例です。
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
// Task は処理するタスクのインターフェース
type Task interface {
Execute(ctx context.Context) error
ID() string
}
// TaskProcessor はタスクを並行処理するシステム
type TaskProcessor struct {
workers int
queue chan Task
results chan Result
errors chan error
stats *Statistics
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
shutdownOnce sync.Once
}
// Result はタスクの実行結果
type Result struct {
TaskID string
StartTime time.Time
EndTime time.Time
Error error
}
// Statistics は処理統計
type Statistics struct {
totalTasks int64
completedTasks int64
failedTasks int64
activeWorkers int64
}
// NewTaskProcessor は新しいタスクプロセッサを作成
func NewTaskProcessor(workers, queueSize int) *TaskProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &TaskProcessor{
workers: workers,
queue: make(chan Task, queueSize),
results: make(chan Result, queueSize),
errors: make(chan error, queueSize),
stats: &Statistics{},
ctx: ctx,
cancel: cancel,
}
}
// Start はワーカーを起動
func (tp *TaskProcessor) Start() {
log.Printf("タスクプロセッサを起動(ワーカー数: %d)\n", tp.workers)
for i := 0; i < tp.workers; i++ {
tp.wg.Add(1)
go tp.worker(i)
}
// 統計情報を定期的に出力
go tp.statsReporter()
}
// worker は個々のワーカーの処理ループ
func (tp *TaskProcessor) worker(id int) {
defer tp.wg.Done()
atomic.AddInt64(&tp.stats.activeWorkers, 1)
defer atomic.AddInt64(&tp.stats.activeWorkers, -1)
log.Printf("Worker %d: 起動\n", id)
for {
select {
case task, ok := <-tp.queue:
if !ok {
log.Printf("Worker %d: キューがクローズされました\n", id)
return
}
tp.processTask(id, task)
case <-tp.ctx.Done():
log.Printf("Worker %d: コンテキストがキャンセルされました\n", id)
return
}
}
}
// processTask はタスクを処理
func (tp *TaskProcessor) processTask(workerID int, task Task) {
startTime := time.Now()
defer func() {
if r := recover(); r != nil {
log.Printf("Worker %d: パニック回復 - タスクID: %s, エラー: %v\n",
workerID, task.ID(), r)
atomic.AddInt64(&tp.stats.failedTasks, 1)
tp.results <- Result{
TaskID: task.ID(),
StartTime: startTime,
EndTime: time.Now(),
Error: fmt.Errorf("panic: %v", r),
}
}
}()
log.Printf("Worker %d: タスク %s を処理中\n", workerID, task.ID())
// タスクを実行
err := task.Execute(tp.ctx)
endTime := time.Now()
duration := endTime.Sub(startTime)
if err != nil {
atomic.AddInt64(&tp.stats.failedTasks, 1)
log.Printf("Worker %d: タスク %s 失敗(所要時間: %v, エラー: %v)\n",
workerID, task.ID(), duration, err)
} else {
atomic.AddInt64(&tp.stats.completedTasks, 1)
log.Printf("Worker %d: タスク %s 完了(所要時間: %v)\n",
workerID, task.ID(), duration)
}
// 結果を送信
select {
case tp.results <- Result{
TaskID: task.ID(),
StartTime: startTime,
EndTime: endTime,
Error: err,
}:
case <-tp.ctx.Done():
return
}
}
// Submit はタスクをキューに追加
func (tp *TaskProcessor) Submit(task Task) error {
atomic.AddInt64(&tp.stats.totalTasks, 1)
select {
case tp.queue <- task:
return nil
case <-tp.ctx.Done():
return fmt.Errorf("プロセッサがシャットダウン中です")
default:
return fmt.Errorf("キューが満杯です")
}
}
// Results は結果チャネルを返す
func (tp *TaskProcessor) Results() <-chan Result {
return tp.results
}
// Shutdown はプロセッサを安全にシャットダウン
func (tp *TaskProcessor) Shutdown(timeout time.Duration) error {
var shutdownErr error
tp.shutdownOnce.Do(func() {
log.Println("シャットダウンを開始します...")
// 新規タスクの受付を停止
close(tp.queue)
// タイムアウト付きで完了を待機
done := make(chan struct{})
go func() {
tp.wg.Wait()
close(tp.results)
close(tp.errors)
close(done)
}()
select {
case <-done:
log.Println("正常にシャットダウンしました")
shutdownErr = nil
case <-time.After(timeout):
log.Println("シャットダウンタイムアウト - 強制終了します")
tp.cancel() // 強制キャンセル
<-done // それでも完了を待つ
shutdownErr = fmt.Errorf("shutdown timeout")
}
})
return shutdownErr
}
// GetStats は統計情報を取得
func (tp *TaskProcessor) GetStats() (total, completed, failed, active int64) {
return atomic.LoadInt64(&tp.stats.totalTasks),
atomic.LoadInt64(&tp.stats.completedTasks),
atomic.LoadInt64(&tp.stats.failedTasks),
atomic.LoadInt64(&tp.stats.activeWorkers)
}
// statsReporter は定期的に統計情報を出力
func (tp *TaskProcessor) statsReporter() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
total, completed, failed, active := tp.GetStats()
log.Printf("統計 - 総タスク数: %d, 完了: %d, 失敗: %d, アクティブワーカー: %d\n",
total, completed, failed, active)
case <-tp.ctx.Done():
return
}
}
}
// 使用例: シンプルなタスク実装
type SimpleTask struct {
id string
duration time.Duration
}
func (t *SimpleTask) ID() string {
return t.id
}
func (t *SimpleTask) Execute(ctx context.Context) error {
select {
case <-time.After(t.duration):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
// プロセッサを作成
processor := NewTaskProcessor(5, 100) // 5ワーカー、キューサイズ100
processor.Start()
// シグナルハンドリング
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("シグナル受信 - シャットダウンします")
processor.Shutdown(10 * time.Second)
os.Exit(0)
}()
// タスクを投入
for i := 0; i < 50; i++ {
task := &SimpleTask{
id: fmt.Sprintf("task-%d", i),
duration: time.Millisecond * time.Duration(100+i*10),
}
if err := processor.Submit(task); err != nil {
log.Printf("タスク投入失敗: %v\n", err)
}
}
// 結果を収集
go func() {
for result := range processor.Results() {
if result.Error != nil {
log.Printf("結果: %s - エラー: %v\n", result.TaskID, result.Error)
} else {
duration := result.EndTime.Sub(result.StartTime)
log.Printf("結果: %s - 成功(所要時間: %v)\n", result.TaskID, duration)
}
}
}()
// 10秒後にシャットダウン
time.Sleep(10 * time.Second)
if err := processor.Shutdown(5 * time.Second); err != nil {
log.Printf("シャットダウンエラー: %v\n", err)
}
// 最終統計を出力
total, completed, failed, _ := processor.GetStats()
log.Printf("最終統計 - 総タスク数: %d, 完了: %d, 失敗: %d\n",
total, completed, failed)
}
このプロダクションレディコードは以下の特徴を持っています:
---
まとめ
並行処理の実装では以下が重要です:
これらの原則を理解し実践することで、堅牢で高性能な並行処理システムを構築できます。
---
チャレンジ3の解答: 並行ダウンローダー
基本実装
package main
import (
"context"
"fmt"
"sync"
"time"
)
type DownloadResult struct {
URL string
Size int
Duration time.Duration
Error error
}
func download(ctx context.Context, url string) DownloadResult {
start := time.Now()
// ダウンロードをシミュレート
select {
case <-time.After(time.Millisecond * 500):
return DownloadResult{
URL: url,
Size: 1024 * 1024,
Duration: time.Since(start),
Error: nil,
}
case <-ctx.Done():
return DownloadResult{
URL: url,
Error: ctx.Err(),
Duration: time.Since(start),
}
}
}
func parallelDownload(urls []string, maxConcurrent int) []DownloadResult {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
results := make([]DownloadResult, len(urls))
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i, url := range urls {
wg.Add(1)
go func(index int, u string) {
defer wg.Done()
// セマフォを取得
sem <- struct{}{}
defer func() { <-sem }()
results[index] = download(ctx, u)
}(i, url)
}
wg.Wait()
return results
}
プロダクショングレードの実装
package main
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
)
// Downloader はファイルダウンローダー
type Downloader struct {
client *http.Client
maxWorkers int
progressCh chan Progress
retryMax int
retryDelay time.Duration
}
// Progress はダウンロード進捗情報
type Progress struct {
URL string
BytesDownloaded int64
TotalBytes int64
Percentage float64
Error error
}
// NewDownloader は新しいダウンローダーを作成
func NewDownloader(maxWorkers, retryMax int) *Downloader {
return &Downloader{
client: &http.Client{
Timeout: 30 * time.Second,
},
maxWorkers: maxWorkers,
progressCh: make(chan Progress, 100),
retryMax: retryMax,
retryDelay: time.Second,
}
}
// Download は複数のURLを並行ダウンロード
func (d *Downloader) Download(ctx context.Context, urls []string) <-chan Progress {
results := make(chan Progress, len(urls))
go func() {
defer close(results)
sem := make(chan struct{}, d.maxWorkers)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
// セマフォを取得
select {
case sem <- struct{}{}:
defer func() { <-sem }()
case <-ctx.Done():
results <- Progress{URL: u, Error: ctx.Err()}
return
}
// リトライ付きダウンロード
progress := d.downloadWithRetry(ctx, u)
results <- progress
}(url)
}
wg.Wait()
}()
return results
}
// downloadWithRetry はリトライ付きダウンロード
func (d *Downloader) downloadWithRetry(ctx context.Context, url string) Progress {
var lastErr error
for attempt := 0; attempt <= d.retryMax; attempt++ {
if attempt > 0 {
select {
case <-time.After(d.retryDelay * time.Duration(attempt)):
case <-ctx.Done():
return Progress{URL: url, Error: ctx.Err()}
}
}
progress, err := d.downloadFile(ctx, url)
if err == nil {
return progress
}
lastErr = err
fmt.Printf("リトライ %d/%d: %s (%v)\n", attempt+1, d.retryMax, url, err)
}
return Progress{URL: url, Error: lastErr}
}
// downloadFile は実際のダウンロード処理
func (d *Downloader) downloadFile(ctx context.Context, url string) (Progress, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return Progress{}, err
}
resp, err := d.client.Do(req)
if err != nil {
return Progress{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return Progress{}, fmt.Errorf("bad status: %s", resp.Status)
}
totalBytes := resp.ContentLength
var downloadedBytes int64
// プログレスバーのシミュレーション
buffer := make([]byte, 32*1024)
for {
select {
case <-ctx.Done():
return Progress{}, ctx.Err()
default:
}
n, err := resp.Body.Read(buffer)
if n > 0 {
atomic.AddInt64(&downloadedBytes, int64(n))
if totalBytes > 0 {
percentage := float64(downloadedBytes) / float64(totalBytes) * 100
select {
case d.progressCh <- Progress{
URL: url,
BytesDownloaded: downloadedBytes,
TotalBytes: totalBytes,
Percentage: percentage,
}:
default:
}
}
}
if err == io.EOF {
break
}
if err != nil {
return Progress{}, err
}
}
return Progress{
URL: url,
BytesDownloaded: downloadedBytes,
TotalBytes: totalBytes,
Percentage: 100.0,
}, nil
}
// ProgressChannel は進捗チャネルを返す
func (d *Downloader) ProgressChannel() <-chan Progress {
return d.progressCh
}
// 使用例
func demonstrateDownloader() {
downloader := NewDownloader(5, 3)
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// 進捗モニタリング
go func() {
for progress := range downloader.ProgressChannel() {
fmt.Printf("[%.1f%%] %s: %d/%d bytes\n",
progress.Percentage, progress.URL,
progress.BytesDownloaded, progress.TotalBytes)
}
}()
// ダウンロード実行
results := downloader.Download(ctx, urls)
for result := range results {
if result.Error != nil {
fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("完了 %s: %d bytes\n", result.URL, result.BytesDownloaded)
}
}
}
---
パフォーマンス比較
ベンチマーク
package main
import (
"sync"
"sync/atomic"
"testing"
)
// Mutex vs Atomic
func BenchmarkMutex(b *testing.B) {
var counter int
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
counter++
mu.Unlock()
}
})
}
func BenchmarkAtomic(b *testing.B) {
var counter int64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomic.AddInt64(&counter, 1)
}
})
}
// Channel vs Mutex
func BenchmarkChannel(b *testing.B) {
ch := make(chan int, 1)
ch <- 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
val := <-ch
val++
ch <- val
}
})
}
結果の例:
BenchmarkMutex-8 50000000 25.4 ns/op
BenchmarkAtomic-8 200000000 6.2 ns/op
BenchmarkChannel-8 10000000 156.0 ns/op
分析:
- Atomic操作が最速(Mutexの約4倍)
- チャネルはオーバーヘッドが大きいが、通信に適している
- 単純なカウンターにはatomic、複雑な状態管理にはMutexを使用
- 適切な同期プリミティブの選択: Mutex、RWMutex、atomic、チャネル
- リソース管理: ゴルーチンリークの防止、適切なクリーンアップ
- エラーハンドリング: リトライロジック、コンテキストによるキャンセル
- 監視と統計: メトリクスの収集、パフォーマンスの測定
- テスト:
-raceフラグでの競合検出、ベンチマークの実施
---
まとめ
プロダクショングレードの並行処理実装では以下が重要です: