課題14: パイプラインパターンの実装
課題概要
この課題では、チャネルを使ったデータ処理パイプラインを実装します。複数のステージを持つパイプライン、ファンアウト/ファンイン、select文による制御を通じて、チャネルの実践的な使い方を習得します。
マンダトリー要件
要件1: 基本パイプラインの実装(25点)
数値処理パイプラインを実装してください。
ファイル: pipeline.go
package pipeline
// Generator は数値を生成するステージ
func Generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Square は数値を2乗するステージ
func Square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Filter は条件を満たす数値のみを通すステージ
func Filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// Sum は数値を合計するステージ
func Sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
sum := 0
for n := range in {
sum += n
}
out <- sum
}()
return out
}
// Multiply は数値を定数倍するステージ
func Multiply(in <-chan int, factor int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * factor
}
}()
return out
}
実装すべき内容:
Generator: 数値を生成Square: 2乗Filter: 条件フィルタリングSum: 合計Multiply: 定数倍- すべてのステージでチャネルを適切に閉じる
要件2: ファンアウト/ファンインの実装(30点)
並行処理のためのファンアウト/ファンインパターンを実装してください。
ファイル: fanout.go
package pipeline
import (
"sync"
"time"
)
// HeavyWork は重い処理をシミュレート
type HeavyWork struct {
ID int
Input int
Output int
}
// FanOut は入力を複数のワーカーに分配
func FanOut(in <-chan int, workers int, process func(int) int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
out := make(chan int)
channels[i] = out
go func(ch chan<- int) {
defer close(ch)
for num := range in {
// 重い処理のシミュレーション
time.Sleep(100 * time.Millisecond)
result := process(num)
ch <- result
}
}(out)
}
return channels
}
// FanIn は複数のチャネルを1つに集約
func FanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for num := range c {
out <- num
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// FanOutFanIn はファンアウト/ファンインを組み合わせた処理
func FanOutFanIn(in <-chan int, workers int, process func(int) int) <-chan int {
workerChannels := FanOut(in, workers, process)
return FanIn(workerChannels...)
}
実装すべき内容:
FanOut: 入力を複数ワーカーに分配FanIn: 複数チャネルを1つに集約FanOutFanIn: 組み合わせた処理- sync.WaitGroupで同期
要件3: select文を使った制御(25点)
タイムアウト、キャンセル、複数チャネル操作を実装してください。
ファイル: select.go
package pipeline
import (
"context"
"fmt"
"time"
)
// ProcessWithTimeout はタイムアウト付きで処理
func ProcessWithTimeout(in <-chan int, timeout time.Duration) (<-chan int, <-chan error) {
out := make(chan int)
errCh := make(chan error, 1)
go func() {
defer close(out)
defer close(errCh)
for {
select {
case num, ok := <-in:
if !ok {
return
}
out <- num * 2
case <-time.After(timeout):
errCh <- fmt.Errorf("タイムアウト")
return
}
}
}()
return out, errCh
}
// ProcessWithCancel はキャンセル可能な処理
func ProcessWithCancel(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case num, ok := <-in:
if !ok {
return
}
select {
case out <- num * 2:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
// Merge は複数のチャネルを1つにマージ
func Merge(channels ...<-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
// nilチャネルでselectを無効化するテクニック
cases := make([]<-chan int, len(channels))
copy(cases, channels)
for {
allNil := true
for _, ch := range cases {
if ch != nil {
allNil = false
break
}
}
if allNil {
break
}
// select文を動的に構築
for i, ch := range cases {
if ch != nil {
select {
case val, ok := <-ch:
if !ok {
cases[i] = nil
} else {
out <- val
}
default:
}
}
}
}
}()
return out
}
// OrDone はコンテキストまたはチャネルの終了を待つ
func OrDone(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case val, ok := <-in:
if !ok {
return
}
select {
case out <- val:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
実装すべき内容:
ProcessWithTimeout: タイムアウト制御ProcessWithCancel: コンテキストでキャンセルMerge: 複数チャネルのマージOrDone: コンテキストとチャネルの統合
要件4: 統合例(20点)
実装した機能を組み合わせたメインプログラムを作成してください。
ファイル: main.go
package main
import (
"context"
"fmt"
"time"
"yourmodule/pipeline"
)
func main() {
testBasicPipeline()
fmt.Println()
testFanOutFanIn()
fmt.Println()
testSelectControl()
}
func testBasicPipeline() {
fmt.Println("=== 基本パイプライン ===")
// 1から10までの数値を生成
nums := pipeline.Generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// 2乗
squared := pipeline.Square(nums)
// 偶数のみフィルタ
evens := pipeline.Filter(squared, func(n int) bool {
return n%2 == 0
})
// 結果の出力
fmt.Print("偶数の2乗: ")
for num := range evens {
fmt.Printf("%d ", num)
}
fmt.Println()
// 合計の計算
nums2 := pipeline.Generator(1, 2, 3, 4, 5)
squared2 := pipeline.Square(nums2)
sum := pipeline.Sum(squared2)
total := <-sum
fmt.Printf("1²+2²+3²+4²+5² = %d\n", total)
}
func testFanOutFanIn() {
fmt.Println("=== ファンアウト/ファンイン ===")
// 入力データ
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// 3つのワーカーで並行処理
start := time.Now()
result := pipeline.FanOutFanIn(input, 3, func(n int) int {
return n * n
})
// 結果の出力
results := []int{}
for num := range result {
results = append(results, num)
}
elapsed := time.Since(start)
fmt.Printf("処理結果: %v\n", results)
fmt.Printf("所要時間: %v\n", elapsed)
}
func testSelectControl() {
fmt.Println("=== select制御 ===")
// タイムアウトテスト
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 5; i++ {
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
out, errCh := pipeline.ProcessWithTimeout(input, 1*time.Second)
fmt.Print("タイムアウト付き処理: ")
for num := range out {
fmt.Printf("%d ", num)
}
fmt.Println()
if err := <-errCh; err != nil {
fmt.Println("エラー:", err)
}
// キャンセルテスト
ctx, cancel := context.WithCancel(context.Background())
input2 := make(chan int)
go func() {
defer close(input2)
for i := 1; i <= 10; i++ {
input2 <- i
time.Sleep(50 * time.Millisecond)
}
}()
result := pipeline.ProcessWithCancel(ctx, input2)
go func() {
time.Sleep(200 * time.Millisecond)
cancel() // 途中でキャンセル
}()
fmt.Print("キャンセル可能な処理: ")
for num := range result {
fmt.Printf("%d ", num)
}
fmt.Println()
}
実装すべき内容:
- 基本パイプラインのテスト
- ファンアウト/ファンインのテスト
- select制御のテスト
- 結果の表示
期待される出力
=== 基本パイプライン ===
偶数の2乗: 4 16 36 64 100
1²+2²+3²+4²+5² = 55
=== ファンアウト/ファンイン ===
処理結果: [1 4 9 16 25 36 49 64 81 100]
所要時間: 400ms
=== select制御 ===
タイムアウト付き処理: 2 4 6 8 10
キャンセル可能な処理: 2 4 6 8
ボーナス課題
> ボーナス: これらはオプションです。マンダトリー部分が完了してから取り組んでください。
ボーナス1: バッファ付きパイプライン(10点)
バッファサイズを調整可能なパイプラインを実装してください。
func BufferedPipeline(bufSize int, stages ...func(<-chan int) <-chan int) func(<-chan int) <-chan int {
return func(in <-chan int) <-chan int {
out := in
for _, stage := range stages {
buffered := make(chan int, bufSize)
go func(input <-chan int, output chan<- int) {
defer close(output)
for v := range input {
output <- v
}
}(out, buffered)
out = stage(buffered)
}
return out
}
}
ボーナス2: エラーハンドリング付きパイプライン(5点)
各ステージでエラーを伝播できるパイプラインを実装してください。
type Result struct {
Value int
Err error
}
func SafeSquare(in <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for n := range in {
if n < 0 {
out <- Result{Err: fmt.Errorf("負の数は処理できません: %d", n)}
} else {
out <- Result{Value: n * n}
}
}
}()
return out
}
ボーナス3: 動的ワーカー調整(5点)
負荷に応じてワーカー数を動的に調整する機能を実装してください。
type DynamicPool struct {
minWorkers int
maxWorkers int
current int
mu sync.Mutex
}
func (p *DynamicPool) AdjustWorkers(queueSize int)
評価基準
| 項目 | 配点 | 詳細 |
|---|---|---|
| 基本パイプライン | 25点 | すべてのステージが正しく動作する |
| ファンアウト/ファンイン | 30点 | 並行処理が正しく実装されている |
| select制御 | 25点 | タイムアウト、キャンセルが動作する |
| メインプログラム | 20点 | 全機能を統合してテストしている |
| **ボーナス1** | 10点 | バッファサイズが調整可能 |
| **ボーナス2** | 5点 | エラー処理が適切に実装されている |
| **ボーナス3** | 5点 | 動的ワーカー調整が動作する |
提出方法
以下のファイルを提出してください:
submission/
├── go.mod
├── pipeline.go # 基本パイプライン
├── fanout.go # ファンアウト/ファンイン
├── select.go # select制御
├── main.go # メインプログラム
└── bonus/ # ボーナス課題(オプション)
├── buffered.go # バッファ付き
├── safe.go # エラーハンドリング
└── dynamic.go # 動的ワーカー
ヒント
defer close(ch)を使う