課題14: パイプラインパターンの実装

課題概要

この課題では、チャネルを使ったデータ処理パイプラインを実装します。複数のステージを持つパイプライン、ファンアウト/ファンイン、select文による制御を通じて、チャネルの実践的な使い方を習得します。

マンダトリー要件

要件1: 基本パイプラインの実装(25点)

数値処理パイプラインを実装してください。

ファイル: pipeline.go

package pipeline

// Generator は数値を生成するステージ
func Generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Square は数値を2乗するステージ
func Square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Filter は条件を満たす数値のみを通すステージ
func Filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if predicate(n) {
                out <- n
            }
        }
    }()
    return out
}

// Sum は数値を合計するステージ
func Sum(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        sum := 0
        for n := range in {
            sum += n
        }
        out <- sum
    }()
    return out
}

// Multiply は数値を定数倍するステージ
func Multiply(in <-chan int, factor int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * factor
        }
    }()
    return out
}

実装すべき内容

  • Generator: 数値を生成
  • Square: 2乗
  • Filter: 条件フィルタリング
  • Sum: 合計
  • Multiply: 定数倍
  • すべてのステージでチャネルを適切に閉じる

要件2: ファンアウト/ファンインの実装(30点)

並行処理のためのファンアウト/ファンインパターンを実装してください。

ファイル: fanout.go

package pipeline

import (
    "sync"
    "time"
)

// HeavyWork は重い処理をシミュレート
type HeavyWork struct {
    ID     int
    Input  int
    Output int
}

// FanOut は入力を複数のワーカーに分配
func FanOut(in <-chan int, workers int, process func(int) int) []<-chan int {
    channels := make([]<-chan int, workers)

    for i := 0; i < workers; i++ {
        out := make(chan int)
        channels[i] = out

        go func(ch chan<- int) {
            defer close(ch)
            for num := range in {
                // 重い処理のシミュレーション
                time.Sleep(100 * time.Millisecond)
                result := process(num)
                ch <- result
            }
        }(out)
    }

    return channels
}

// FanIn は複数のチャネルを1つに集約
func FanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for num := range c {
                out <- num
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// FanOutFanIn はファンアウト/ファンインを組み合わせた処理
func FanOutFanIn(in <-chan int, workers int, process func(int) int) <-chan int {
    workerChannels := FanOut(in, workers, process)
    return FanIn(workerChannels...)
}

実装すべき内容

  • FanOut: 入力を複数ワーカーに分配
  • FanIn: 複数チャネルを1つに集約
  • FanOutFanIn: 組み合わせた処理
  • sync.WaitGroupで同期

要件3: select文を使った制御(25点)

タイムアウト、キャンセル、複数チャネル操作を実装してください。

ファイル: select.go

package pipeline

import (
    "context"
    "fmt"
    "time"
)

// ProcessWithTimeout はタイムアウト付きで処理
func ProcessWithTimeout(in <-chan int, timeout time.Duration) (<-chan int, <-chan error) {
    out := make(chan int)
    errCh := make(chan error, 1)

    go func() {
        defer close(out)
        defer close(errCh)

        for {
            select {
            case num, ok := <-in:
                if !ok {
                    return
                }
                out <- num * 2
            case <-time.After(timeout):
                errCh <- fmt.Errorf("タイムアウト")
                return
            }
        }
    }()

    return out, errCh
}

// ProcessWithCancel はキャンセル可能な処理
func ProcessWithCancel(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for {
            select {
            case num, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- num * 2:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return out
}

// Merge は複数のチャネルを1つにマージ
func Merge(channels ...<-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        // nilチャネルでselectを無効化するテクニック
        cases := make([]<-chan int, len(channels))
        copy(cases, channels)

        for {
            allNil := true
            for _, ch := range cases {
                if ch != nil {
                    allNil = false
                    break
                }
            }

            if allNil {
                break
            }

            // select文を動的に構築
            for i, ch := range cases {
                if ch != nil {
                    select {
                    case val, ok := <-ch:
                        if !ok {
                            cases[i] = nil
                        } else {
                            out <- val
                        }
                    default:
                    }
                }
            }
        }
    }()

    return out
}

// OrDone はコンテキストまたはチャネルの終了を待つ
func OrDone(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for {
            select {
            case val, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- val:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    return out
}

実装すべき内容

  • ProcessWithTimeout: タイムアウト制御
  • ProcessWithCancel: コンテキストでキャンセル
  • Merge: 複数チャネルのマージ
  • OrDone: コンテキストとチャネルの統合

要件4: 統合例(20点)

実装した機能を組み合わせたメインプログラムを作成してください。

ファイル: main.go

package main

import (
    "context"
    "fmt"
    "time"

    "yourmodule/pipeline"
)

func main() {
    testBasicPipeline()
    fmt.Println()
    testFanOutFanIn()
    fmt.Println()
    testSelectControl()
}

func testBasicPipeline() {
    fmt.Println("=== 基本パイプライン ===")

    // 1から10までの数値を生成
    nums := pipeline.Generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    // 2乗
    squared := pipeline.Square(nums)

    // 偶数のみフィルタ
    evens := pipeline.Filter(squared, func(n int) bool {
        return n%2 == 0
    })

    // 結果の出力
    fmt.Print("偶数の2乗: ")
    for num := range evens {
        fmt.Printf("%d ", num)
    }
    fmt.Println()

    // 合計の計算
    nums2 := pipeline.Generator(1, 2, 3, 4, 5)
    squared2 := pipeline.Square(nums2)
    sum := pipeline.Sum(squared2)

    total := <-sum
    fmt.Printf("1²+2²+3²+4²+5² = %d\n", total)
}

func testFanOutFanIn() {
    fmt.Println("=== ファンアウト/ファンイン ===")

    // 入力データ
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()

    // 3つのワーカーで並行処理
    start := time.Now()
    result := pipeline.FanOutFanIn(input, 3, func(n int) int {
        return n * n
    })

    // 結果の出力
    results := []int{}
    for num := range result {
        results = append(results, num)
    }

    elapsed := time.Since(start)
    fmt.Printf("処理結果: %v\n", results)
    fmt.Printf("所要時間: %v\n", elapsed)
}

func testSelectControl() {
    fmt.Println("=== select制御 ===")

    // タイムアウトテスト
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    out, errCh := pipeline.ProcessWithTimeout(input, 1*time.Second)
    fmt.Print("タイムアウト付き処理: ")
    for num := range out {
        fmt.Printf("%d ", num)
    }
    fmt.Println()

    if err := <-errCh; err != nil {
        fmt.Println("エラー:", err)
    }

    // キャンセルテスト
    ctx, cancel := context.WithCancel(context.Background())

    input2 := make(chan int)
    go func() {
        defer close(input2)
        for i := 1; i <= 10; i++ {
            input2 <- i
            time.Sleep(50 * time.Millisecond)
        }
    }()

    result := pipeline.ProcessWithCancel(ctx, input2)

    go func() {
        time.Sleep(200 * time.Millisecond)
        cancel() // 途中でキャンセル
    }()

    fmt.Print("キャンセル可能な処理: ")
    for num := range result {
        fmt.Printf("%d ", num)
    }
    fmt.Println()
}

実装すべき内容

  • 基本パイプラインのテスト
  • ファンアウト/ファンインのテスト
  • select制御のテスト
  • 結果の表示
  • 期待される出力

    === 基本パイプライン ===
    偶数の2乗: 4 16 36 64 100
    1²+2²+3²+4²+5² = 55
    
    === ファンアウト/ファンイン ===
    処理結果: [1 4 9 16 25 36 49 64 81 100]
    所要時間: 400ms
    
    === select制御 ===
    タイムアウト付き処理: 2 4 6 8 10
    キャンセル可能な処理: 2 4 6 8
    

    ボーナス課題

    > ボーナス: これらはオプションです。マンダトリー部分が完了してから取り組んでください。

    ボーナス1: バッファ付きパイプライン(10点)

    バッファサイズを調整可能なパイプラインを実装してください。

    func BufferedPipeline(bufSize int, stages ...func(<-chan int) <-chan int) func(<-chan int) <-chan int {
        return func(in <-chan int) <-chan int {
            out := in
            for _, stage := range stages {
                buffered := make(chan int, bufSize)
                go func(input <-chan int, output chan<- int) {
                    defer close(output)
                    for v := range input {
                        output <- v
                    }
                }(out, buffered)
                out = stage(buffered)
            }
            return out
        }
    }
    

    ボーナス2: エラーハンドリング付きパイプライン(5点)

    各ステージでエラーを伝播できるパイプラインを実装してください。

    type Result struct {
        Value int
        Err   error
    }
    
    func SafeSquare(in <-chan int) <-chan Result {
        out := make(chan Result)
        go func() {
            defer close(out)
            for n := range in {
                if n < 0 {
                    out <- Result{Err: fmt.Errorf("負の数は処理できません: %d", n)}
                } else {
                    out <- Result{Value: n * n}
                }
            }
        }()
        return out
    }
    

    ボーナス3: 動的ワーカー調整(5点)

    負荷に応じてワーカー数を動的に調整する機能を実装してください。

    type DynamicPool struct {
        minWorkers int
        maxWorkers int
        current    int
        mu         sync.Mutex
    }
    
    func (p *DynamicPool) AdjustWorkers(queueSize int)
    

    評価基準

    項目 配点 詳細
    基本パイプライン 25点 すべてのステージが正しく動作する
    ファンアウト/ファンイン 30点 並行処理が正しく実装されている
    select制御 25点 タイムアウト、キャンセルが動作する
    メインプログラム 20点 全機能を統合してテストしている
    **ボーナス1** 10点 バッファサイズが調整可能
    **ボーナス2** 5点 エラー処理が適切に実装されている
    **ボーナス3** 5点 動的ワーカー調整が動作する

    提出方法

    以下のファイルを提出してください:

    submission/
    ├── go.mod
    ├── pipeline.go        # 基本パイプライン
    ├── fanout.go          # ファンアウト/ファンイン
    ├── select.go          # select制御
    ├── main.go            # メインプログラム
    └── bonus/            # ボーナス課題(オプション)
        ├── buffered.go   # バッファ付き
        ├── safe.go       # エラーハンドリング
        └── dynamic.go    # 動的ワーカー
    

    ヒント

  • チャネルの閉じ方: 送信側が閉じる、defer close(ch)を使う
  • デッドロック: すべてのgoroutineがブロックしないように注意
  • select: defaultケースで非ブロッキング操作
  • ファンイン: sync.WaitGroupで全ワーカーの完了を待つ
  • コンテキスト: キャンセルや タイムアウトに活用
  • 学習リソース

  • Go Concurrency Patterns: Pipelines and cancellation
  • Go Concurrency Patterns
  • Advanced Go Concurrency Patterns
  • context package documentation