Day 5: チャネル - 解答例

課題1: パイプラインパターンの実装

基本解法

package main

import "fmt"

// ジェネレーター: 数値を生成してチャネルに送信
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 2乗計算: 入力チャネルから受信し、2乗してチャネルに送信
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// フィルター: 閾値より大きい値のみを通過
func filter(in <-chan int, threshold int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n > threshold {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // パイプライン構築
    nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(nums)
    filtered := filter(squared, 50)

    // 結果を出力
    for result := range filtered {
        fmt.Println(result)  // 64, 81, 100
    }
}

解法2: context による キャンセル対応版

package main

import (
    "context"
    "fmt"
)

func generatorWithContext(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return  // キャンセル時は即座に終了
            }
        }
    }()
    return out
}

func squareWithContext(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case n, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- n * n:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func filterWithContext(ctx context.Context, in <-chan int, threshold int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case n, ok := <-in:
                if !ok {
                    return
                }
                if n > threshold {
                    select {
                    case out <- n:
                    case <-ctx.Done():
                        return
                    }
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    nums := generatorWithContext(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := squareWithContext(ctx, nums)
    filtered := filterWithContext(ctx, squared, 50)

    for result := range filtered {
        fmt.Println(result)
    }
}

---

課題2: Fan-out / Fan-in パターン

基本解法

package main

import (
    "fmt"
    "sync"
    "time"
)

// ジョブ定義
type Job struct {
    ID   int
    Data string
}

// 結果定義
type Result struct {
    JobID int
    Value int
}

// ワーカー関数
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // 重い処理をシミュレート
        time.Sleep(100 * time.Millisecond)
        results <- Result{
            JobID: job.ID,
            Value: len(job.Data) * 2,
        }
    }
}

func main() {
    const numJobs = 20
    const numWorkers = 5

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    // Fan-out: ワーカーを起動
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // ジョブを送信
    go func() {
        for i := 1; i <= numJobs; i++ {
            jobs <- Job{
                ID:   i,
                Data: fmt.Sprintf("Task-%d", i),
            }
        }
        close(jobs)
    }()

    // ワーカーの完了を待ってから results をクローズ
    go func() {
        wg.Wait()
        close(results)
    }()

    // Fan-in: 結果を集約
    for result := range results {
        fmt.Printf("Job %d: Value = %d\n", result.JobID, result.Value)
    }
}

解法2: エラーハンドリング付き

package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID int
    Value int
    Err   error
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // エラーをシミュレート(10%の確率)
        if job.ID%10 == 0 {
            results <- Result{
                JobID: job.ID,
                Err:   errors.New("処理エラー"),
            }
            continue
        }

        time.Sleep(50 * time.Millisecond)
        results <- Result{
            JobID: job.ID,
            Value: len(job.Data) * 2,
        }
    }
}

func main() {
    const numJobs = 20
    const numWorkers = 5

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    go func() {
        for i := 1; i <= numJobs; i++ {
            jobs <- Job{ID: i, Data: fmt.Sprintf("Task-%d", i)}
        }
        close(jobs)
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    // 結果処理(エラーカウント)
    successCount := 0
    errorCount := 0

    for result := range results {
        if result.Err != nil {
            errorCount++
            fmt.Printf("Job %d: エラー - %v\n", result.JobID, result.Err)
        } else {
            successCount++
            fmt.Printf("Job %d: 成功 - Value = %d\n", result.JobID, result.Value)
        }
    }

    fmt.Printf("\n処理完了: 成功 %d, エラー %d\n", successCount, errorCount)
}

---

課題3: Worker Pool パターン

基本解法

package main

import (
    "fmt"
    "time"
)

type Task struct {
    ID   int
    Name string
}

type TaskResult struct {
    Task   Task
    Status string
}

// ワーカープール実装
type WorkerPool struct {
    tasks   chan Task
    results chan TaskResult
    workers int
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        tasks:   make(chan Task, 100),
        results: make(chan TaskResult, 100),
        workers: workers,
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    for task := range wp.tasks {
        // タスク処理
        time.Sleep(100 * time.Millisecond)
        wp.results <- TaskResult{
            Task:   task,
            Status: fmt.Sprintf("Worker %d が処理完了", id),
        }
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.tasks <- task
}

func (wp *WorkerPool) Close() {
    close(wp.tasks)
}

func (wp *WorkerPool) Results() <-chan TaskResult {
    return wp.results
}

func main() {
    pool := NewWorkerPool(5)
    pool.Start()

    // タスクを送信
    go func() {
        for i := 1; i <= 20; i++ {
            pool.Submit(Task{
                ID:   i,
                Name: fmt.Sprintf("Task-%d", i),
            })
        }
        pool.Close()
    }()

    // 結果を受信
    count := 0
    for result := range pool.Results() {
        fmt.Printf("[%s] %s\n", result.Task.Name, result.Status)
        count++
        if count == 20 {
            break
        }
    }
}

解法2: グレースフルシャットダウン対応版

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID   int
    Name string
}

type TaskResult struct {
    Task   Task
    Status string
    Err    error
}

type WorkerPool struct {
    tasks      chan Task
    results    chan TaskResult
    numWorkers int
    wg         sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
    return &WorkerPool{
        tasks:      make(chan Task, 100),
        results:    make(chan TaskResult, 100),
        numWorkers: numWorkers,
    }
}

func (wp *WorkerPool) Start(ctx context.Context) {
    for i := 0; i < wp.numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(ctx, i)
    }

    // すべてのワーカーが終了したら results をクローズ
    go func() {
        wp.wg.Wait()
        close(wp.results)
    }()
}

func (wp *WorkerPool) worker(ctx context.Context, id int) {
    defer wp.wg.Done()

    for {
        select {
        case task, ok := <-wp.tasks:
            if !ok {
                fmt.Printf("Worker %d: タスクチャネルがクローズされました\n", id)
                return
            }
            wp.processTask(ctx, id, task)
        case <-ctx.Done():
            fmt.Printf("Worker %d: キャンセルされました\n", id)
            return
        }
    }
}

func (wp *WorkerPool) processTask(ctx context.Context, workerID int, task Task) {
    // タイムアウト付き処理
    processDone := make(chan struct{})
    go func() {
        time.Sleep(100 * time.Millisecond)
        close(processDone)
    }()

    select {
    case <-processDone:
        wp.results <- TaskResult{
            Task:   task,
            Status: fmt.Sprintf("Worker %d が処理完了", workerID),
        }
    case <-ctx.Done():
        wp.results <- TaskResult{
            Task: task,
            Err:  ctx.Err(),
        }
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.tasks <- task
}

func (wp *WorkerPool) Close() {
    close(wp.tasks)
}

func (wp *WorkerPool) Results() <-chan TaskResult {
    return wp.results
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    pool := NewWorkerPool(5)
    pool.Start(ctx)

    // タスクを送信
    go func() {
        for i := 1; i <= 20; i++ {
            pool.Submit(Task{
                ID:   i,
                Name: fmt.Sprintf("Task-%d", i),
            })
        }
        pool.Close()
    }()

    // 結果を受信
    for result := range pool.Results() {
        if result.Err != nil {
            fmt.Printf("[%s] エラー: %v\n", result.Task.Name, result.Err)
        } else {
            fmt.Printf("[%s] %s\n", result.Task.Name, result.Status)
        }
    }

    fmt.Println("すべてのタスクが完了しました")
}

---

課題4: Semaphore パターン(レート制限)

基本解法

package main

import (
    "fmt"
    "time"
)

// セマフォ実装
type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(maxConcurrent int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, maxConcurrent),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

func main() {
    const maxConcurrent = 3
    sem := NewSemaphore(maxConcurrent)

    for i := 1; i <= 10; i++ {
        sem.Acquire()
        go func(id int) {
            defer sem.Release()
            fmt.Printf("タスク %d 開始\n", id)
            time.Sleep(2 * time.Second)
            fmt.Printf("タスク %d 完了\n", id)
        }(i)
    }

    // すべてのタスクが完了するまで待つ
    time.Sleep(10 * time.Second)
}

解法2: context.Context 統合版

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(maxConcurrent int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, maxConcurrent),
    }
}

func (s *Semaphore) AcquireWithContext(ctx context.Context) error {
    select {
    case s.sem <- struct{}{}:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (s *Semaphore) Release() {
    <-s.sem
}

func processTask(ctx context.Context, id int, sem *Semaphore, wg *sync.WaitGroup) {
    defer wg.Done()

    // セマフォ取得
    if err := sem.AcquireWithContext(ctx); err != nil {
        fmt.Printf("タスク %d: セマフォ取得失敗 - %v\n", id, err)
        return
    }
    defer sem.Release()

    fmt.Printf("タスク %d 開始\n", id)

    // 処理(キャンセル可能)
    timer := time.NewTimer(2 * time.Second)
    defer timer.Stop()

    select {
    case <-timer.C:
        fmt.Printf("タスク %d 完了\n", id)
    case <-ctx.Done():
        fmt.Printf("タスク %d キャンセル\n", id)
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
    defer cancel()

    const maxConcurrent = 3
    sem := NewSemaphore(maxConcurrent)

    var wg sync.WaitGroup
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go processTask(ctx, i, sem, &wg)
    }

    wg.Wait()
    fmt.Println("すべてのタスクが完了しました")
}

---

課題5: タイムアウト付き select

基本解法

package main

import (
    "fmt"
    "time"
)

func fetchData(url string, ch chan<- string) {
    // 外部API呼び出しをシミュレート
    time.Sleep(3 * time.Second)
    ch <- fmt.Sprintf("Data from %s", url)
}

func main() {
    ch := make(chan string)

    go fetchData("https://api.example.com", ch)

    select {
    case data := <-ch:
        fmt.Println("成功:", data)
    case <-time.After(2 * time.Second):
        fmt.Println("タイムアウト: APIレスポンスが遅すぎます")
    }
}

解法2: 複数API並行呼び出し版

package main

import (
    "fmt"
    "time"
)

type APIResponse struct {
    URL  string
    Data string
    Err  error
}

func fetchDataWithTimeout(url string, timeout time.Duration) (string, error) {
    ch := make(chan APIResponse, 1)

    go func() {
        // API呼び出しシミュレート(ランダムな遅延)
        delay := time.Duration(1+url[len(url)-1]%3) * time.Second
        time.Sleep(delay)
        ch <- APIResponse{
            URL:  url,
            Data: fmt.Sprintf("Response from %s", url),
        }
    }()

    select {
    case resp := <-ch:
        return resp.Data, resp.Err
    case <-time.After(timeout):
        return "", fmt.Errorf("timeout after %v", timeout)
    }
}

func main() {
    urls := []string{
        "https://api1.example.com",
        "https://api2.example.com",
        "https://api3.example.com",
    }

    type result struct {
        url  string
        data string
        err  error
    }

    results := make(chan result, len(urls))

    // 並行API呼び出し
    for _, url := range urls {
        go func(u string) {
            data, err := fetchDataWithTimeout(u, 2*time.Second)
            results <- result{url: u, data: data, err: err}
        }(url)
    }

    // 結果を収集
    for i := 0; i < len(urls); i++ {
        res := <-results
        if res.err != nil {
            fmt.Printf("[%s] エラー: %v\n", res.url, res.err)
        } else {
            fmt.Printf("[%s] 成功: %s\n", res.url, res.data)
        }
    }
}

---

課題6: Done チャネルパターン

基本解法

package main

import (
    "fmt"
    "time"
)

func worker(done <-chan struct{}, name string) {
    for {
        select {
        case <-done:
            fmt.Printf("%s: 終了します\n", name)
            return
        default:
            fmt.Printf("%s: 作業中...\n", name)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    done := make(chan struct{})

    go worker(done, "Worker 1")
    go worker(done, "Worker 2")
    go worker(done, "Worker 3")

    time.Sleep(3 * time.Second)

    fmt.Println("シャットダウン開始...")
    close(done)  // すべてのワーカーに終了を通知

    time.Sleep(time.Second)
    fmt.Println("シャットダウン完了")
}

解法2: グレースフルシャットダウン with WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(done <-chan struct{}, name string, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case <-done:
            fmt.Printf("%s: クリーンアップ処理中...\n", name)
            time.Sleep(500 * time.Millisecond)  // クリーンアップ
            fmt.Printf("%s: 終了しました\n", name)
            return
        default:
            fmt.Printf("%s: 作業中...\n", name)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    done := make(chan struct{})
    var wg sync.WaitGroup

    workers := []string{"Worker 1", "Worker 2", "Worker 3"}
    for _, name := range workers {
        wg.Add(1)
        go worker(done, name, &wg)
    }

    time.Sleep(3 * time.Second)

    fmt.Println("シャットダウン開始...")
    close(done)

    // すべてのワーカーが終了するまで待つ
    wg.Wait()
    fmt.Println("すべてのワーカーが正常に終了しました")
}

---

ベストプラクティスまとめ

1. チャネルの方向性を明示する

// 良い例
func producer(out chan<- int) {  // 送信専用
    out <- 42
}

func consumer(in <-chan int) {  // 受信専用
    val := <-in
}

2. 送信側でクローズする

// 良い例
func generateNumbers(max int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)  // 必ずクローズ
        for i := 0; i < max; i++ {
            out <- i
        }
    }()
    return out
}

3. context でキャンセル可能にする

// 良い例
func workWithContext(ctx context.Context) error {
    resultCh := make(chan int)

    go func() {
        // 重い処理
        time.Sleep(5 * time.Second)
        resultCh <- 42
    }()

    select {
    case result := <-resultCh:
        fmt.Println("結果:", result)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

4. バッファサイズは慎重に選択

// 悪い例
ch := make(chan int, 10000)  // 過剰なバッファ

// 良い例
ch := make(chan int, numWorkers)  // ワーカー数に基づく

5. エラーハンドリングを忘れない

type Result struct {
    Value int
    Err   error
}

func processWithError(data int) Result {
    if data < 0 {
        return Result{Err: errors.New("負の値は処理できません")}
    }
    return Result{Value: data * 2}
}

---

パフォーマンス最適化のヒント

1. チャネルのバッファリング

// 遅い
unbuffered := make(chan int)

// 速い(適切なサイズ)
buffered := make(chan int, 100)

2. ゴルーチンプール

// 悪い例: ゴルーチンを大量作成
for i := 0; i < 10000; i++ {
    go process(i)
}

// 良い例: ワーカープール
for i := 0; i < numCPU; i++ {
    go worker(jobs, results)
}

3. select のデフォルトケース

// 非ブロッキング受信
select {
case data := <-ch:
    process(data)
default:
    // チャネルが空の場合の処理
}

---

まとめ

今日学んだ解法パターン:

  • Pipeline: データ変換の連鎖
  • Fan-out/Fan-in: 並行処理と集約
  • Worker Pool: リソース制限付き並行処理
  • Semaphore: 同時実行数の制御
  • Timeout: タイムアウト制御
  • Done Channel: グレースフルシャットダウン

すべてのパターンに共通する重要なポイント:

  • チャネルは送信側でクローズ
  • context でキャンセル可能に
  • エラーハンドリングを忘れない
  • リソースリークに注意

次のDay 6では、これらのパターンを組み合わせた高度な実装を学びます。