Day 4: 並行処理の基礎 - 解答例

目次

---

並行処理アプローチの比較

並行処理を実装する際には、複数のアプローチがあります。それぞれのトレードオフを理解することが重要です。

アプローチ1: goroutineとsync.WaitGroup

長所:

  • シンプルで理解しやすい
  • 固定数のタスクに最適
  • オーバーヘッドが小さい

短所:

  • エラーハンドリングが難しい
  • 動的なタスク追加に対応しにくい
  • タイムアウトの実装が煩雑

ベストユースケース: 既知の固定数のタスクを並行実行する場合

アプローチ2: channelベース

長所:

  • エラーハンドリングが自然
  • 動的なタスク追加が容易
  • ストリーミング処理に最適

短所:

  • channelのバッファリングを考慮する必要がある
  • デッドロックのリスク
  • やや複雑

ベストユースケース: プロデューサー・コンシューマーパターン、ストリーミング処理

アプローチ3: worker poolパターン

長所:

  • リソース使用量を制御できる
  • スケーラビリティが高い
  • 本番環境に最適

短所:

  • 実装が複雑
  • オーバーヘッドが大きい

ベストユースケース: 大量のタスク処理、リソース制限が必要な場合

---

チャレンジ1: 並行ダウンローダー

解法1: 基本的なgoroutineとWaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

// Result はダウンロード結果を表す構造体
type Result struct {
    URL   string        // ダウンロード対象のURL
    Size  int           // ダウンロードしたバイト数
    Error error         // エラーがあればここに格納
}

// download は指定されたURLをダウンロードする(シミュレーション)
// url: ダウンロード対象のURL
// results: 結果を送信するチャネル
// wg: goroutineの完了を通知するWaitGroup
func download(url string, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done() // goroutine終了時にWaitGroupのカウンタをデクリメント

    // ダウンロード処理をシミュレート
    time.Sleep(time.Millisecond * 100) // 実際のネットワーク遅延を模倣

    // 結果をチャネルに送信
    // ここではURLの長さを基にサイズを計算(実際はHTTP Content-Lengthを使用)
    results <- Result{
        URL:   url,
        Size:  len(url) * 100, // 仮のサイズ計算
        Error: nil,
    }
}

func main() {
    // ダウンロード対象のURL一覧
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
        "https://example.com/file4.zip",
        "https://example.com/file5.zip",
    }

    // バッファ付きチャネルを作成(全URL数分のバッファ)
    // バッファを持つことで、送信側がブロックされずに処理を続行できる
    results := make(chan Result, len(urls))

    // WaitGroupを初期化
    var wg sync.WaitGroup

    // 各URLに対してgoroutineを起動
    for _, url := range urls {
        wg.Add(1) // WaitGroupのカウンタをインクリメント
        go download(url, results, &wg) // 非同期にダウンロードを実行
    }

    // 全てのgoroutineが完了するまで待機
    wg.Wait()
    close(results) // 全ての結果が送信されたのでチャネルをクローズ

    // 結果を集計
    var totalSize int
    for result := range results {
        if result.Error != nil {
            fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("完了 %s: %d bytes\n", result.URL, result.Size)
            totalSize += result.Size
        }
    }

    fmt.Printf("\n合計ダウンロード: %d bytes\n", totalSize)
}

解法2: channelベースアプローチ

package main

import (
    "context"
    "fmt"
    "time"
)

// DownloadTask はダウンロードタスクを表す
type DownloadTask struct {
    URL string
}

// DownloadResult はダウンロード結果を表す
type DownloadResult struct {
    URL      string
    Size     int
    Duration time.Duration
    Error    error
}

// downloadWorker はチャネルからタスクを受け取り、ダウンロードを実行する
// ctx: キャンセル制御用のコンテキスト
// tasks: 受信専用のタスクチャネル
// results: 送信専用の結果チャネル
func downloadWorker(ctx context.Context, tasks <-chan DownloadTask, results chan<- DownloadResult) {
    for task := range tasks { // tasksチャネルがクローズされるまでループ
        select {
        case <-ctx.Done(): // コンテキストがキャンセルされた場合
            return
        default:
            // ダウンロード処理を実行
            start := time.Now()

            // 実際のダウンロード処理(ここではシミュレーション)
            time.Sleep(time.Millisecond * 200)

            // 結果を送信
            results <- DownloadResult{
                URL:      task.URL,
                Size:     len(task.URL) * 100,
                Duration: time.Since(start),
                Error:    nil,
            }
        }
    }
}

func channelBasedDownload(urls []string, numWorkers int) []DownloadResult {
    // タイムアウト付きコンテキストを作成
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel() // 関数終了時にリソースをクリーンアップ

    // タスクと結果のチャネルを作成
    tasks := make(chan DownloadTask, len(urls))      // タスク用チャネル
    results := make(chan DownloadResult, len(urls))  // 結果用チャネル

    // ワーカーgoroutineを起動
    for i := 0; i < numWorkers; i++ {
        go downloadWorker(ctx, tasks, results) // 複数のワーカーを並行実行
    }

    // タスクをチャネルに送信
    for _, url := range urls {
        tasks <- DownloadTask{URL: url}
    }
    close(tasks) // 全タスク送信完了

    // 結果を収集
    downloadResults := make([]DownloadResult, 0, len(urls))
    for i := 0; i < len(urls); i++ {
        select {
        case result := <-results:
            downloadResults = append(downloadResults, result)
        case <-ctx.Done(): // タイムアウト発生
            fmt.Println("タイムアウトしました")
            return downloadResults
        }
    }

    return downloadResults
}

func main() {
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
        "https://example.com/file4.zip",
        "https://example.com/file5.zip",
    }

    // 3つのワーカーで並行ダウンロード
    results := channelBasedDownload(urls, 3)

    // 結果を表示
    for _, result := range results {
        if result.Error != nil {
            fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("完了 %s: %d bytes (所要時間: %v)\n",
                result.URL, result.Size, result.Duration)
        }
    }
}

解法3: worker poolパターン

この解法は最も堅牢で、本番環境に適しています。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Job はワーカープールで処理するジョブのインターフェース
type Job interface {
    Execute(ctx context.Context) (interface{}, error)
}

// DownloadJob はダウンロードジョブの実装
type DownloadJob struct {
    URL string
}

// Execute はジョブを実行する
func (j *DownloadJob) Execute(ctx context.Context) (interface{}, error) {
    start := time.Now()

    // ダウンロード処理(シミュレーション)
    select {
    case <-time.After(time.Millisecond * 150):
        // 成功
        return DownloadResult{
            URL:      j.URL,
            Size:     len(j.URL) * 100,
            Duration: time.Since(start),
        }, nil
    case <-ctx.Done():
        // キャンセル
        return nil, ctx.Err()
    }
}

// WorkerPool はワーカープールの実装
type WorkerPool struct {
    workers    int                    // ワーカー数
    jobs       chan Job               // ジョブキュー
    results    chan interface{}       // 結果チャネル
    errors     chan error             // エラーチャネル
    wg         sync.WaitGroup         // 完了待機用
    ctx        context.Context        // キャンセル制御用
    cancelFunc context.CancelFunc     // キャンセル関数
}

// NewWorkerPool は新しいワーカープールを作成
func NewWorkerPool(workers int) *WorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    return &WorkerPool{
        workers:    workers,
        jobs:       make(chan Job, workers*2),     // バッファサイズはワーカー数の2倍
        results:    make(chan interface{}, workers*2),
        errors:     make(chan error, workers*2),
        ctx:        ctx,
        cancelFunc: cancel,
    }
}

// Start はワーカープールを開始
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i) // 各ワーカーをgoroutineとして起動
    }
}

// worker は個々のワーカーの処理ループ
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done() // ワーカー終了時にWaitGroupをデクリメント

    fmt.Printf("Worker %d: 起動しました\n", id)

    for {
        select {
        case job, ok := <-wp.jobs:
            if !ok {
                // ジョブチャネルがクローズされた
                fmt.Printf("Worker %d: 終了します\n", id)
                return
            }

            fmt.Printf("Worker %d: ジョブを処理中\n", id)

            // ジョブを実行
            result, err := job.Execute(wp.ctx)

            if err != nil {
                // エラーが発生した場合
                select {
                case wp.errors <- err:
                case <-wp.ctx.Done():
                    return
                }
            } else {
                // 成功した場合
                select {
                case wp.results <- result:
                case <-wp.ctx.Done():
                    return
                }
            }

        case <-wp.ctx.Done():
            // コンテキストがキャンセルされた
            fmt.Printf("Worker %d: キャンセルされました\n", id)
            return
        }
    }
}

// Submit はジョブをワーカープールに投入
func (wp *WorkerPool) Submit(job Job) error {
    select {
    case wp.jobs <- job:
        return nil
    case <-wp.ctx.Done():
        return fmt.Errorf("worker pool is closed")
    }
}

// Results は結果チャネルを返す
func (wp *WorkerPool) Results() <-chan interface{} {
    return wp.results
}

// Errors はエラーチャネルを返す
func (wp *WorkerPool) Errors() <-chan error {
    return wp.errors
}

// Shutdown はワーカープールを安全にシャットダウン
func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
    // ジョブチャネルをクローズ(新規ジョブを受け付けない)
    close(wp.jobs)

    // タイムアウト付きで全ワーカーの完了を待機
    done := make(chan struct{})
    go func() {
        wp.wg.Wait()        // 全ワーカーの完了を待機
        close(wp.results)   // 結果チャネルをクローズ
        close(wp.errors)    // エラーチャネルをクローズ
        close(done)
    }()

    select {
    case <-done:
        // 正常にシャットダウン完了
        fmt.Println("正常にシャットダウンしました")
        return nil
    case <-time.After(timeout):
        // タイムアウト発生 - 強制終了
        wp.cancelFunc() // コンテキストをキャンセルして全ワーカーを停止
        fmt.Println("タイムアウト - 強制終了しました")
        return fmt.Errorf("shutdown timeout")
    }
}

func main() {
    // 5つのワーカーを持つプールを作成
    pool := NewWorkerPool(5)
    pool.Start()

    // ダウンロードジョブを投入
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
        "https://example.com/file4.zip",
        "https://example.com/file5.zip",
        "https://example.com/file6.zip",
        "https://example.com/file7.zip",
        "https://example.com/file8.zip",
        "https://example.com/file9.zip",
        "https://example.com/file10.zip",
    }

    // ジョブを非同期に投入
    go func() {
        for _, url := range urls {
            if err := pool.Submit(&DownloadJob{URL: url}); err != nil {
                fmt.Printf("ジョブ投入エラー: %v\n", err)
                return
            }
        }
    }()

    // 結果とエラーを収集
    var completedCount int
    var errorCount int

    // 別goroutineでエラーを監視
    go func() {
        for err := range pool.Errors() {
            fmt.Printf("エラー: %v\n", err)
            errorCount++
        }
    }()

    // 結果を収集
    for result := range pool.Results() {
        if dr, ok := result.(DownloadResult); ok {
            fmt.Printf("完了: %s (%d bytes, %v)\n", dr.URL, dr.Size, dr.Duration)
            completedCount++

            // 全ジョブが完了したらシャットダウン
            if completedCount == len(urls) {
                pool.Shutdown(5 * time.Second)
                break
            }
        }
    }

    fmt.Printf("\n完了: %d件, エラー: %d件\n", completedCount, errorCount)
}

チャレンジ2: TTLキャッシュ

基本実装

シンプルな実装から始めて、段階的に機能を追加していきます。

package main

import (
    "sync"
    "time"
)

// CacheItem はキャッシュアイテムを表す
type CacheItem struct {
    Value     string         // 保存する値
    ExpiresAt time.Time      // 有効期限
}

// TTLCache はTTL(Time To Live)付きキャッシュ
type TTLCache struct {
    mu    sync.RWMutex                // 読み書きロック
    items map[string]CacheItem        // アイテムを格納するマップ
}

// NewTTLCache は新しいTTLキャッシュを作成
func NewTTLCache() *TTLCache {
    return &TTLCache{
        items: make(map[string]CacheItem), // マップを初期化
    }
}

// Get はキーに対応する値を取得
func (c *TTLCache) Get(key string) (string, bool) {
    c.mu.RLock()              // 読み取りロックを取得
    defer c.mu.RUnlock()      // 関数終了時にアンロック

    item, ok := c.items[key]  // マップから取得
    if !ok {
        return "", false      // キーが存在しない
    }

    // 有効期限をチェック
    if time.Now().After(item.ExpiresAt) {
        return "", false      // 期限切れ
    }

    return item.Value, true   // 値を返す
}

// Set はキーと値を設定(TTL付き)
func (c *TTLCache) Set(key, value string, ttl time.Duration) {
    c.mu.Lock()               // 書き込みロックを取得
    defer c.mu.Unlock()       // 関数終了時にアンロック

    c.items[key] = CacheItem{
        Value:     value,
        ExpiresAt: time.Now().Add(ttl), // 有効期限を設定
    }
}

// Delete はキーを削除
func (c *TTLCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}

プロダクショングレードの完全実装

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// CacheEntry はキャッシュのエントリ
type CacheEntry struct {
    Value      interface{}
    ExpiresAt  time.Time
    AccessedAt time.Time
}

// ProductionTTLCache はプロダクションレベルのTTLキャッシュ
type ProductionTTLCache struct {
    mu         sync.RWMutex
    items      map[string]*CacheEntry
    maxSize    int
    defaultTTL time.Duration
    cleanupInterval time.Duration
    stats      CacheStats
    stopCh     chan struct{}
    wg         sync.WaitGroup
}

// CacheStats はキャッシュの統計情報
type CacheStats struct {
    mu          sync.RWMutex
    hits        int64
    misses      int64
    evictions   int64
    expirations int64
}

// NewProductionTTLCache は新しいキャッシュを作成
func NewProductionTTLCache(maxSize int, defaultTTL, cleanupInterval time.Duration) *ProductionTTLCache {
    cache := &ProductionTTLCache{
        items:           make(map[string]*CacheEntry),
        maxSize:         maxSize,
        defaultTTL:      defaultTTL,
        cleanupInterval: cleanupInterval,
        stopCh:          make(chan struct{}),
    }

    // バックグラウンドクリーンアップを開始
    cache.wg.Add(1)
    go cache.cleanupLoop()

    return cache
}

// Get はキーに対応する値を取得
func (c *ProductionTTLCache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    entry, exists := c.items[key]
    c.mu.RUnlock()

    if !exists {
        c.stats.recordMiss()
        return nil, false
    }

    if time.Now().After(entry.ExpiresAt) {
        c.stats.recordMiss()
        c.Delete(key)
        return nil, false
    }

    // アクセス時刻を更新(LRU用)
    c.mu.Lock()
    entry.AccessedAt = time.Now()
    c.mu.Unlock()

    c.stats.recordHit()
    return entry.Value, true
}

// Set はキーと値を設定
func (c *ProductionTTLCache) Set(key string, value interface{}) {
    c.SetWithTTL(key, value, c.defaultTTL)
}

// SetWithTTL はカスタムTTLでキーと値を設定
func (c *ProductionTTLCache) SetWithTTL(key string, value interface{}, ttl time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()

    // サイズ制限チェック
    if len(c.items) >= c.maxSize {
        c.evictLRU()
    }

    c.items[key] = &CacheEntry{
        Value:      value,
        ExpiresAt:  time.Now().Add(ttl),
        AccessedAt: time.Now(),
    }
}

// Delete はキーを削除
func (c *ProductionTTLCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}

// Clear は全てのエントリを削除
func (c *ProductionTTLCache) Clear() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.items = make(map[string]*CacheEntry)
}

// Size は現在のエントリ数を返す
func (c *ProductionTTLCache) Size() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return len(c.items)
}

// evictLRU はLRU(最も古いアクセス)のエントリを削除
func (c *ProductionTTLCache) evictLRU() {
    var oldestKey string
    var oldestTime time.Time

    for key, entry := range c.items {
        if oldestKey == "" || entry.AccessedAt.Before(oldestTime) {
            oldestKey = key
            oldestTime = entry.AccessedAt
        }
    }

    if oldestKey != "" {
        delete(c.items, oldestKey)
        c.stats.recordEviction()
    }
}

// cleanupLoop は期限切れエントリを定期的にクリーンアップ
func (c *ProductionTTLCache) cleanupLoop() {
    defer c.wg.Done()

    ticker := time.NewTicker(c.cleanupInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            c.cleanup()
        case <-c.stopCh:
            return
        }
    }
}

// cleanup は期限切れエントリを削除
func (c *ProductionTTLCache) cleanup() {
    c.mu.Lock()
    defer c.mu.Unlock()

    now := time.Now()
    for key, entry := range c.items {
        if now.After(entry.ExpiresAt) {
            delete(c.items, key)
            c.stats.recordExpiration()
        }
    }
}

// Close はキャッシュをクローズ
func (c *ProductionTTLCache) Close() {
    close(c.stopCh)
    c.wg.Wait()
}

// GetStats は統計情報を取得
func (c *ProductionTTLCache) GetStats() (hits, misses, evictions, expirations int64) {
    return c.stats.get()
}

// CacheStats のメソッド
func (s *CacheStats) recordHit() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.hits++
}

func (s *CacheStats) recordMiss() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.misses++
}

func (s *CacheStats) recordEviction() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.evictions++
}

func (s *CacheStats) recordExpiration() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.expirations++
}

func (s *CacheStats) get() (hits, misses, evictions, expirations int64) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.hits, s.misses, s.evictions, s.expirations
}

// 使用例
func demonstrateTTLCache() {
    cache := NewProductionTTLCache(
        100,                    // 最大100エントリ
        5*time.Second,          // デフォルトTTL
        1*time.Second,          // クリーンアップ間隔
    )
    defer cache.Close()

    // データを設定
    cache.Set("user:1", map[string]string{"name": "太郎"})
    cache.SetWithTTL("session:abc", "active", 10*time.Second)

    // データを取得
    if value, ok := cache.Get("user:1"); ok {
        fmt.Printf("取得成功: %v\n", value)
    }

    // 統計情報を表示
    time.Sleep(2 * time.Second)
    hits, misses, evictions, expirations := cache.GetStats()
    fmt.Printf("ヒット: %d, ミス: %d, 削除: %d, 期限切れ: %d\n",
        hits, misses, evictions, expirations)
}

---

Optimization Path: 進化の過程

並行処理の実装を段階的に改善していく過程を示します。

ステージ1: 素朴な実装

// 問題点:エラーハンドリングなし、タイムアウトなし、リソース制限なし
func naiveParallelProcess(items []string) {
    for _, item := range items {
        go func(i string) {
            // 処理
            process(i)
        }(item)
    }
    // 完了を待たずに終了してしまう!
}

問題点:

  • goroutineの完了を待たない
  • エラーを捕捉できない
  • 無制限にgoroutineを生成

ステージ2: WaitGroupでの完了待機

// 改善点:完了を正しく待機
func betterParallelProcess(items []string) {
    var wg sync.WaitGroup // 完了を追跡

    for _, item := range items {
        wg.Add(1) // カウンタをインクリメント
        go func(i string) {
            defer wg.Done() // 必ず実行される
            process(i)
        }(item)
    }

    wg.Wait() // 全goroutineの完了を待機
}

改善点:

  • 全goroutineの完了を正しく待機
  • defer wg.Done()でパニック時も確実にカウントダウン

残る問題点:

  • エラーハンドリングがない
  • タイムアウトがない

ステージ3: エラーハンドリング追加

func withErrorHandling(items []string) []error {
    var (
        wg     sync.WaitGroup
        mu     sync.Mutex        // エラースライス保護用
        errors []error
    )

    for _, item := range items {
        wg.Add(1)
        go func(i string) {
            defer wg.Done()

            if err := process(i); err != nil {
                mu.Lock()               // ロックを取得
                errors = append(errors, err) // スレッドセーフに追加
                mu.Unlock()             // ロックを解放
            }
        }(item)
    }

    wg.Wait()
    return errors // 全エラーを返す
}

改善点:

  • エラーを収集して返す
  • Mutexで競合状態を防ぐ

ステージ4: タイムアウト追加

func withTimeout(items []string, timeout time.Duration) ([]error, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel() // リソースを必ずクリーンアップ

    var (
        wg     sync.WaitGroup
        mu     sync.Mutex
        errors []error
    )

    for _, item := range items {
        wg.Add(1)
        go func(i string) {
            defer wg.Done()

            // コンテキストをprocessに渡す
            if err := processWithContext(ctx, i); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
            }
        }(item)
    }

    // 完了またはタイムアウトを待機
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return errors, nil // 正常完了
    case <-ctx.Done():
        return errors, fmt.Errorf("timeout exceeded") // タイムアウト
    }
}

改善点:

  • コンテキストでタイムアウトを制御
  • 処理が長引いてもタイムアウトで中断

ステージ5: Graceful Shutdown(本番品質)

type GracefulProcessor struct {
    maxWorkers int
    timeout    time.Duration
    sem        chan struct{}  // セマフォでワーカー数制限
}

func NewGracefulProcessor(maxWorkers int, timeout time.Duration) *GracefulProcessor {
    return &GracefulProcessor{
        maxWorkers: maxWorkers,
        timeout:    timeout,
        sem:        make(chan struct{}, maxWorkers), // バッファ付きチャネル
    }
}

func (gp *GracefulProcessor) Process(items []string) ([]error, error) {
    ctx, cancel := context.WithTimeout(context.Background(), gp.timeout)
    defer cancel()

    var (
        wg     sync.WaitGroup
        mu     sync.Mutex
        errors []error
    )

    for _, item := range items {
        // セマフォを取得(ワーカー数を制限)
        select {
        case gp.sem <- struct{}{}:
            // セマフォ取得成功
        case <-ctx.Done():
            // タイムアウト
            return errors, ctx.Err()
        }

        wg.Add(1)
        go func(i string) {
            defer func() {
                <-gp.sem     // セマフォを解放
                wg.Done()    // WaitGroupをデクリメント
            }()

            // パニックからの回復
            defer func() {
                if r := recover(); r != nil {
                    mu.Lock()
                    errors = append(errors, fmt.Errorf("panic: %v", r))
                    mu.Unlock()
                }
            }()

            if err := processWithContext(ctx, i); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
            }
        }(item)
    }

    // Graceful shutdown
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return errors, nil
    case <-ctx.Done():
        // タイムアウト発生 - 進行中の処理の完了を待つ
        <-done
        return errors, fmt.Errorf("timeout: some tasks may be incomplete")
    }
}

最終改善点:

  • セマフォでワーカー数を制限(リソース保護)
  • パニックからの回復
  • Graceful shutdown(進行中の処理は完了させる)
  • 本番環境で使用可能な品質

---

Common Mistakes: よくある間違い

間違い1: goroutineリーク

// 悪い例:goroutineがリークする
func badExample() {
    ch := make(chan int) // バッファなし

    go func() {
        value := <-ch // 永遠に待ち続ける
        fmt.Println(value)
    }()

    // chに送信せずに関数が終了
    // goroutineは永遠に残る(リーク)
}

// 良い例:タイムアウトを設定
func goodExample() {
    ch := make(chan int)

    go func() {
        select {
        case value := <-ch:
            fmt.Println(value)
        case <-time.After(time.Second):
            fmt.Println("タイムアウト")
            return // goroutineが終了する
        }
    }()

    time.Sleep(2 * time.Second)
}

間違い2: データ競合

// 悪い例:データ競合が発生
func badRace() {
    counter := 0 // 共有変数
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // 競合!複数goroutineが同時にアクセス
        }()
    }

    wg.Wait()
    fmt.Println(counter) // 1000にならない可能性が高い
}

// 良い例1:Mutexで保護
func goodMutex() {
    counter := 0
    var mu sync.Mutex
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()         // ロック取得
            counter++         // クリティカルセクション
            mu.Unlock()       // ロック解放
        }()
    }

    wg.Wait()
    fmt.Println(counter) // 必ず1000
}

// 良い例2:atomic操作(最速)
func goodAtomic() {
    var counter int64 // int64型
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1) // アトミックに加算
        }()
    }

    wg.Wait()
    fmt.Println(atomic.LoadInt64(&counter)) // 必ず1000
}

間違い3: デッドロック

// 悪い例:デッドロックが発生
func badDeadlock() {
    var mu1, mu2 sync.Mutex

    // goroutine 1
    go func() {
        mu1.Lock()
        time.Sleep(time.Millisecond)
        mu2.Lock() // goroutine 2がmu2を保持している
        // デッドロック!
        mu2.Unlock()
        mu1.Unlock()
    }()

    // goroutine 2
    go func() {
        mu2.Lock()
        time.Sleep(time.Millisecond)
        mu1.Lock() // goroutine 1がmu1を保持している
        // デッドロック!
        mu1.Unlock()
        mu2.Unlock()
    }()

    time.Sleep(time.Second)
}

// 良い例:ロックの順序を統一
func goodNoDeadlock() {
    var mu1, mu2 sync.Mutex

    // 常にmu1 → mu2の順でロック
    lockBoth := func() {
        mu1.Lock()
        mu2.Lock()
    }

    unlockBoth := func() {
        mu2.Unlock() // 逆順で解放
        mu1.Unlock()
    }

    // goroutine 1
    go func() {
        lockBoth()   // 同じ順序
        // 処理
        unlockBoth()
    }()

    // goroutine 2
    go func() {
        lockBoth()   // 同じ順序
        // 処理
        unlockBoth()
    }()
}

間違い4: バッファなしチャネルでのデッドロック

// 悪い例:送信側がブロック
func badChannelDeadlock() {
    ch := make(chan int) // バッファなし

    ch <- 42 // 受信側がいないのでブロック!
    // デッドロック

    value := <-ch
    fmt.Println(value)
}

// 良い例1:goroutineで送信
func goodChannelGoroutine() {
    ch := make(chan int)

    go func() {
        ch <- 42 // 別goroutineで送信
    }()

    value := <-ch // メインgoroutineで受信
    fmt.Println(value)
}

// 良い例2:バッファ付きチャネル
func goodChannelBuffered() {
    ch := make(chan int, 1) // バッファサイズ1

    ch <- 42 // バッファに格納(ブロックしない)

    value := <-ch
    fmt.Println(value)
}

間違い5: ループ変数のキャプチャ

// 悪い例:全てのgoroutineが最後の値を使用
func badLoopCapture() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println(i) // 全て「5」を出力する可能性が高い
        }()
    }

    wg.Wait()
}

// 良い例1:引数で渡す
func goodLoopParam() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(n int) { // パラメータとして受け取る
            defer wg.Done()
            fmt.Println(n) // 正しい値が出力される
        }(i) // 引数として渡す
    }

    wg.Wait()
}

// 良い例2:ローカル変数にコピー
func goodLoopCopy() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        i := i // シャドーイング(ローカル変数にコピー)
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println(i) // 正しい値
        }()
    }

    wg.Wait()
}

---

パフォーマンス分析

ベンチマーク比較

package main

import (
    "sync"
    "sync/atomic"
    "testing"
)

// ベンチマーク1: Mutex
func BenchmarkMutex(b *testing.B) {
    var counter int
    var mu sync.Mutex

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            counter++
            mu.Unlock()
        }
    })
}

// ベンチマーク2: RWMutex(読み取り)
func BenchmarkRWMutexRead(b *testing.B) {
    var counter int
    var mu sync.RWMutex

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.RLock()
            _ = counter
            mu.RUnlock()
        }
    })
}

// ベンチマーク3: RWMutex(書き込み)
func BenchmarkRWMutexWrite(b *testing.B) {
    var counter int
    var mu sync.RWMutex

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            counter++
            mu.Unlock()
        }
    })
}

// ベンチマーク4: Atomic
func BenchmarkAtomic(b *testing.B) {
    var counter int64

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            atomic.AddInt64(&counter, 1)
        }
    })
}

// ベンチマーク5: Channel
func BenchmarkChannel(b *testing.B) {
    ch := make(chan int, 1)
    ch <- 0

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            val := <-ch
            val++
            ch <- val
        }
    })
}

実行例:

go test -bench=. -benchmem -cpu=1,2,4,8

典型的な結果 (Apple M1, 8 cores):

BenchmarkMutex-8              47,234,156      25.4 ns/op      0 B/op   0 allocs/op
BenchmarkRWMutexRead-8        89,445,677      13.1 ns/op      0 B/op   0 allocs/op
BenchmarkRWMutexWrite-8       45,123,789      26.7 ns/op      0 B/op   0 allocs/op
BenchmarkAtomic-8            198,567,234       6.2 ns/op      0 B/op   0 allocs/op
BenchmarkChannel-8            10,234,567     156.0 ns/op      0 B/op   0 allocs/op

分析:

  • Atomic: 最速(6.2 ns/op)- 単純なカウンターに最適
  • RWMutex(読み取り): 読み取り専用なら高速(13.1 ns/op)
  • Mutex/RWMutex(書き込み): 中程度(25-27 ns/op)
  • Channel: 最遅(156 ns/op)- 通信には最適だがオーバーヘッドが大きい

並行度とパフォーマンスの関係

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func benchmarkWorkers(taskCount, workerCount int) time.Duration {
    tasks := make(chan int, taskCount)
    var wg sync.WaitGroup

    // タスクを生成
    for i := 0; i < taskCount; i++ {
        tasks <- i
    }
    close(tasks)

    start := time.Now()

    // ワーカーを起動
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for task := range tasks {
                // CPU集約的な処理をシミュレート
                _ = fibonacci(20)
                _ = task
            }
        }()
    }

    wg.Wait()
    return time.Since(start)
}

func fibonacci(n int) int {
    if n <= 1 {
        return n
    }
    return fibonacci(n-1) + fibonacci(n-2)
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU()) // CPUコア数を使用

    taskCount := 1000
    workerCounts := []int{1, 2, 4, 8, 16, 32, 64, 128}

    fmt.Println("並行度とパフォーマンスの関係")
    fmt.Println("タスク数:", taskCount)
    fmt.Println("CPUコア数:", runtime.NumCPU())
    fmt.Println()

    for _, workers := range workerCounts {
        duration := benchmarkWorkers(taskCount, workers)
        fmt.Printf("ワーカー数: %3d -> 所要時間: %v\n", workers, duration)
    }
}

典型的な出力 (8コアCPU):

並行度とパフォーマンスの関係
タスク数: 1000
CPUコア数: 8

ワーカー数:   1 -> 所要時間: 15.234s
ワーカー数:   2 -> 所要時間: 7.891s
ワーカー数:   4 -> 所要時間: 4.123s
ワーカー数:   8 -> 所要時間: 2.045s    ← 最適(CPUコア数と一致)
ワーカー数:  16 -> 所要時間: 2.134s
ワーカー数:  32 -> 所要時間: 2.345s
ワーカー数:  64 -> 所要時間: 2.567s
ワーカー数: 128 -> 所要時間: 2.789s

結論:

  • CPU集約的タスク:ワーカー数 = CPUコア数が最適
  • I/O集約的タスク:より多くのワーカーが有効(待機時間が多いため)
  • ---

    プロダクションレディコード

    本番環境で使用できる、完全な並行処理システムの実装例です。

    package main
    
    import (
        "context"
        "fmt"
        "log"
        "os"
        "os/signal"
        "sync"
        "sync/atomic"
        "syscall"
        "time"
    )
    
    // Task は処理するタスクのインターフェース
    type Task interface {
        Execute(ctx context.Context) error
        ID() string
    }
    
    // TaskProcessor はタスクを並行処理するシステム
    type TaskProcessor struct {
        workers      int
        queue        chan Task
        results      chan Result
        errors       chan error
        stats        *Statistics
        wg           sync.WaitGroup
        ctx          context.Context
        cancel       context.CancelFunc
        shutdownOnce sync.Once
    }
    
    // Result はタスクの実行結果
    type Result struct {
        TaskID    string
        StartTime time.Time
        EndTime   time.Time
        Error     error
    }
    
    // Statistics は処理統計
    type Statistics struct {
        totalTasks     int64
        completedTasks int64
        failedTasks    int64
        activeWorkers  int64
    }
    
    // NewTaskProcessor は新しいタスクプロセッサを作成
    func NewTaskProcessor(workers, queueSize int) *TaskProcessor {
        ctx, cancel := context.WithCancel(context.Background())
    
        return &TaskProcessor{
            workers: workers,
            queue:   make(chan Task, queueSize),
            results: make(chan Result, queueSize),
            errors:  make(chan error, queueSize),
            stats:   &Statistics{},
            ctx:     ctx,
            cancel:  cancel,
        }
    }
    
    // Start はワーカーを起動
    func (tp *TaskProcessor) Start() {
        log.Printf("タスクプロセッサを起動(ワーカー数: %d)\n", tp.workers)
    
        for i := 0; i < tp.workers; i++ {
            tp.wg.Add(1)
            go tp.worker(i)
        }
    
        // 統計情報を定期的に出力
        go tp.statsReporter()
    }
    
    // worker は個々のワーカーの処理ループ
    func (tp *TaskProcessor) worker(id int) {
        defer tp.wg.Done()
    
        atomic.AddInt64(&tp.stats.activeWorkers, 1)
        defer atomic.AddInt64(&tp.stats.activeWorkers, -1)
    
        log.Printf("Worker %d: 起動\n", id)
    
        for {
            select {
            case task, ok := <-tp.queue:
                if !ok {
                    log.Printf("Worker %d: キューがクローズされました\n", id)
                    return
                }
    
                tp.processTask(id, task)
    
            case <-tp.ctx.Done():
                log.Printf("Worker %d: コンテキストがキャンセルされました\n", id)
                return
            }
        }
    }
    
    // processTask はタスクを処理
    func (tp *TaskProcessor) processTask(workerID int, task Task) {
        startTime := time.Now()
    
        defer func() {
            if r := recover(); r != nil {
                log.Printf("Worker %d: パニック回復 - タスクID: %s, エラー: %v\n",
                    workerID, task.ID(), r)
                atomic.AddInt64(&tp.stats.failedTasks, 1)
    
                tp.results <- Result{
                    TaskID:    task.ID(),
                    StartTime: startTime,
                    EndTime:   time.Now(),
                    Error:     fmt.Errorf("panic: %v", r),
                }
            }
        }()
    
        log.Printf("Worker %d: タスク %s を処理中\n", workerID, task.ID())
    
        // タスクを実行
        err := task.Execute(tp.ctx)
    
        endTime := time.Now()
        duration := endTime.Sub(startTime)
    
        if err != nil {
            atomic.AddInt64(&tp.stats.failedTasks, 1)
            log.Printf("Worker %d: タスク %s 失敗(所要時間: %v, エラー: %v)\n",
                workerID, task.ID(), duration, err)
        } else {
            atomic.AddInt64(&tp.stats.completedTasks, 1)
            log.Printf("Worker %d: タスク %s 完了(所要時間: %v)\n",
                workerID, task.ID(), duration)
        }
    
        // 結果を送信
        select {
        case tp.results <- Result{
            TaskID:    task.ID(),
            StartTime: startTime,
            EndTime:   endTime,
            Error:     err,
        }:
        case <-tp.ctx.Done():
            return
        }
    }
    
    // Submit はタスクをキューに追加
    func (tp *TaskProcessor) Submit(task Task) error {
        atomic.AddInt64(&tp.stats.totalTasks, 1)
    
        select {
        case tp.queue <- task:
            return nil
        case <-tp.ctx.Done():
            return fmt.Errorf("プロセッサがシャットダウン中です")
        default:
            return fmt.Errorf("キューが満杯です")
        }
    }
    
    // Results は結果チャネルを返す
    func (tp *TaskProcessor) Results() <-chan Result {
        return tp.results
    }
    
    // Shutdown はプロセッサを安全にシャットダウン
    func (tp *TaskProcessor) Shutdown(timeout time.Duration) error {
        var shutdownErr error
    
        tp.shutdownOnce.Do(func() {
            log.Println("シャットダウンを開始します...")
    
            // 新規タスクの受付を停止
            close(tp.queue)
    
            // タイムアウト付きで完了を待機
            done := make(chan struct{})
            go func() {
                tp.wg.Wait()
                close(tp.results)
                close(tp.errors)
                close(done)
            }()
    
            select {
            case <-done:
                log.Println("正常にシャットダウンしました")
                shutdownErr = nil
            case <-time.After(timeout):
                log.Println("シャットダウンタイムアウト - 強制終了します")
                tp.cancel() // 強制キャンセル
                <-done      // それでも完了を待つ
                shutdownErr = fmt.Errorf("shutdown timeout")
            }
        })
    
        return shutdownErr
    }
    
    // GetStats は統計情報を取得
    func (tp *TaskProcessor) GetStats() (total, completed, failed, active int64) {
        return atomic.LoadInt64(&tp.stats.totalTasks),
            atomic.LoadInt64(&tp.stats.completedTasks),
            atomic.LoadInt64(&tp.stats.failedTasks),
            atomic.LoadInt64(&tp.stats.activeWorkers)
    }
    
    // statsReporter は定期的に統計情報を出力
    func (tp *TaskProcessor) statsReporter() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
    
        for {
            select {
            case <-ticker.C:
                total, completed, failed, active := tp.GetStats()
                log.Printf("統計 - 総タスク数: %d, 完了: %d, 失敗: %d, アクティブワーカー: %d\n",
                    total, completed, failed, active)
            case <-tp.ctx.Done():
                return
            }
        }
    }
    
    // 使用例: シンプルなタスク実装
    type SimpleTask struct {
        id       string
        duration time.Duration
    }
    
    func (t *SimpleTask) ID() string {
        return t.id
    }
    
    func (t *SimpleTask) Execute(ctx context.Context) error {
        select {
        case <-time.After(t.duration):
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    
    func main() {
        // プロセッサを作成
        processor := NewTaskProcessor(5, 100) // 5ワーカー、キューサイズ100
        processor.Start()
    
        // シグナルハンドリング
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    
        go func() {
            <-sigChan
            log.Println("シグナル受信 - シャットダウンします")
            processor.Shutdown(10 * time.Second)
            os.Exit(0)
        }()
    
        // タスクを投入
        for i := 0; i < 50; i++ {
            task := &SimpleTask{
                id:       fmt.Sprintf("task-%d", i),
                duration: time.Millisecond * time.Duration(100+i*10),
            }
    
            if err := processor.Submit(task); err != nil {
                log.Printf("タスク投入失敗: %v\n", err)
            }
        }
    
        // 結果を収集
        go func() {
            for result := range processor.Results() {
                if result.Error != nil {
                    log.Printf("結果: %s - エラー: %v\n", result.TaskID, result.Error)
                } else {
                    duration := result.EndTime.Sub(result.StartTime)
                    log.Printf("結果: %s - 成功(所要時間: %v)\n", result.TaskID, duration)
                }
            }
        }()
    
        // 10秒後にシャットダウン
        time.Sleep(10 * time.Second)
        if err := processor.Shutdown(5 * time.Second); err != nil {
            log.Printf("シャットダウンエラー: %v\n", err)
        }
    
        // 最終統計を出力
        total, completed, failed, _ := processor.GetStats()
        log.Printf("最終統計 - 総タスク数: %d, 完了: %d, 失敗: %d\n",
            total, completed, failed)
    }
    

    このプロダクションレディコードは以下の特徴を持っています:

  • Graceful Shutdown: タイムアウト付きの安全なシャットダウン
  • パニック回復: ワーカーがパニックしても他のワーカーに影響しない
  • 統計情報: リアルタイムでパフォーマンスを監視
  • シグナルハンドリング: OSシグナルで安全に終了
  • コンテキスト制御: タイムアウトとキャンセルの伝播
  • エラーハンドリング: 包括的なエラー処理
  • スレッドセーフ: atomic操作で競合状態を回避
  • ---

    まとめ

    並行処理の実装では以下が重要です:

  • 適切なアプローチの選択: WaitGroup、channel、worker poolから状況に応じて選択
  • 段階的な改善: 素朴な実装から本番品質へ進化させる
  • よくある間違いの回避: goroutineリーク、データ競合、デッドロックに注意
  • パフォーマンスの理解: 同期プリミティブの特性を理解して最適な選択
  • 本番環境への対応: graceful shutdown、エラーハンドリング、監視機能の実装

これらの原則を理解し実践することで、堅牢で高性能な並行処理システムを構築できます。

---

チャレンジ3の解答: 並行ダウンローダー

基本実装

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type DownloadResult struct {
    URL      string
    Size     int
    Duration time.Duration
    Error    error
}

func download(ctx context.Context, url string) DownloadResult {
    start := time.Now()

    // ダウンロードをシミュレート
    select {
    case <-time.After(time.Millisecond * 500):
        return DownloadResult{
            URL:      url,
            Size:     1024 * 1024,
            Duration: time.Since(start),
            Error:    nil,
        }
    case <-ctx.Done():
        return DownloadResult{
            URL:      url,
            Error:    ctx.Err(),
            Duration: time.Since(start),
        }
    }
}

func parallelDownload(urls []string, maxConcurrent int) []DownloadResult {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    results := make([]DownloadResult, len(urls))
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(index int, u string) {
            defer wg.Done()

            // セマフォを取得
            sem <- struct{}{}
            defer func() { <-sem }()

            results[index] = download(ctx, u)
        }(i, url)
    }

    wg.Wait()
    return results
}

プロダクショングレードの実装

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "sync"
    "sync/atomic"
    "time"
)

// Downloader はファイルダウンローダー
type Downloader struct {
    client      *http.Client
    maxWorkers  int
    progressCh  chan Progress
    retryMax    int
    retryDelay  time.Duration
}

// Progress はダウンロード進捗情報
type Progress struct {
    URL           string
    BytesDownloaded int64
    TotalBytes    int64
    Percentage    float64
    Error         error
}

// NewDownloader は新しいダウンローダーを作成
func NewDownloader(maxWorkers, retryMax int) *Downloader {
    return &Downloader{
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
        maxWorkers: maxWorkers,
        progressCh: make(chan Progress, 100),
        retryMax:   retryMax,
        retryDelay: time.Second,
    }
}

// Download は複数のURLを並行ダウンロード
func (d *Downloader) Download(ctx context.Context, urls []string) <-chan Progress {
    results := make(chan Progress, len(urls))

    go func() {
        defer close(results)

        sem := make(chan struct{}, d.maxWorkers)
        var wg sync.WaitGroup

        for _, url := range urls {
            wg.Add(1)

            go func(u string) {
                defer wg.Done()

                // セマフォを取得
                select {
                case sem <- struct{}{}:
                    defer func() { <-sem }()
                case <-ctx.Done():
                    results <- Progress{URL: u, Error: ctx.Err()}
                    return
                }

                // リトライ付きダウンロード
                progress := d.downloadWithRetry(ctx, u)
                results <- progress

            }(url)
        }

        wg.Wait()
    }()

    return results
}

// downloadWithRetry はリトライ付きダウンロード
func (d *Downloader) downloadWithRetry(ctx context.Context, url string) Progress {
    var lastErr error

    for attempt := 0; attempt <= d.retryMax; attempt++ {
        if attempt > 0 {
            select {
            case <-time.After(d.retryDelay * time.Duration(attempt)):
            case <-ctx.Done():
                return Progress{URL: url, Error: ctx.Err()}
            }
        }

        progress, err := d.downloadFile(ctx, url)
        if err == nil {
            return progress
        }

        lastErr = err
        fmt.Printf("リトライ %d/%d: %s (%v)\n", attempt+1, d.retryMax, url, err)
    }

    return Progress{URL: url, Error: lastErr}
}

// downloadFile は実際のダウンロード処理
func (d *Downloader) downloadFile(ctx context.Context, url string) (Progress, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return Progress{}, err
    }

    resp, err := d.client.Do(req)
    if err != nil {
        return Progress{}, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return Progress{}, fmt.Errorf("bad status: %s", resp.Status)
    }

    totalBytes := resp.ContentLength
    var downloadedBytes int64

    // プログレスバーのシミュレーション
    buffer := make([]byte, 32*1024)
    for {
        select {
        case <-ctx.Done():
            return Progress{}, ctx.Err()
        default:
        }

        n, err := resp.Body.Read(buffer)
        if n > 0 {
            atomic.AddInt64(&downloadedBytes, int64(n))

            if totalBytes > 0 {
                percentage := float64(downloadedBytes) / float64(totalBytes) * 100
                select {
                case d.progressCh <- Progress{
                    URL:             url,
                    BytesDownloaded: downloadedBytes,
                    TotalBytes:      totalBytes,
                    Percentage:      percentage,
                }:
                default:
                }
            }
        }

        if err == io.EOF {
            break
        }
        if err != nil {
            return Progress{}, err
        }
    }

    return Progress{
        URL:             url,
        BytesDownloaded: downloadedBytes,
        TotalBytes:      totalBytes,
        Percentage:      100.0,
    }, nil
}

// ProgressChannel は進捗チャネルを返す
func (d *Downloader) ProgressChannel() <-chan Progress {
    return d.progressCh
}

// 使用例
func demonstrateDownloader() {
    downloader := NewDownloader(5, 3)

    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
    }

    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    // 進捗モニタリング
    go func() {
        for progress := range downloader.ProgressChannel() {
            fmt.Printf("[%.1f%%] %s: %d/%d bytes\n",
                progress.Percentage, progress.URL,
                progress.BytesDownloaded, progress.TotalBytes)
        }
    }()

    // ダウンロード実行
    results := downloader.Download(ctx, urls)
    for result := range results {
        if result.Error != nil {
            fmt.Printf("エラー %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("完了 %s: %d bytes\n", result.URL, result.BytesDownloaded)
        }
    }
}

---

パフォーマンス比較

ベンチマーク

package main

import (
    "sync"
    "sync/atomic"
    "testing"
)

// Mutex vs Atomic
func BenchmarkMutex(b *testing.B) {
    var counter int
    var mu sync.Mutex

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            counter++
            mu.Unlock()
        }
    })
}

func BenchmarkAtomic(b *testing.B) {
    var counter int64

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            atomic.AddInt64(&counter, 1)
        }
    })
}

// Channel vs Mutex
func BenchmarkChannel(b *testing.B) {
    ch := make(chan int, 1)
    ch <- 0

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            val := <-ch
            val++
            ch <- val
        }
    })
}

結果の例:

BenchmarkMutex-8     50000000    25.4 ns/op
BenchmarkAtomic-8   200000000     6.2 ns/op
BenchmarkChannel-8   10000000   156.0 ns/op

分析:

  • Atomic操作が最速(Mutexの約4倍)
  • チャネルはオーバーヘッドが大きいが、通信に適している
  • 単純なカウンターにはatomic、複雑な状態管理にはMutexを使用
  • ---

    まとめ

    プロダクショングレードの並行処理実装では以下が重要です:

  • 適切な同期プリミティブの選択: Mutex、RWMutex、atomic、チャネル
  • リソース管理: ゴルーチンリークの防止、適切なクリーンアップ
  • エラーハンドリング: リトライロジック、コンテキストによるキャンセル
  • 監視と統計: メトリクスの収集、パフォーマンスの測定
  • テスト: -raceフラグでの競合検出、ベンチマークの実施