Day 5: チャネル - 解答例
課題1: パイプラインパターンの実装
基本解法
package main
import "fmt"
// ジェネレーター: 数値を生成してチャネルに送信
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 2乗計算: 入力チャネルから受信し、2乗してチャネルに送信
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// フィルター: 閾値より大きい値のみを通過
func filter(in <-chan int, threshold int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n > threshold {
out <- n
}
}
}()
return out
}
func main() {
// パイプライン構築
nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
filtered := filter(squared, 50)
// 結果を出力
for result := range filtered {
fmt.Println(result) // 64, 81, 100
}
}
解法2: context による キャンセル対応版
package main
import (
"context"
"fmt"
)
func generatorWithContext(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return // キャンセル時は即座に終了
}
}
}()
return out
}
func squareWithContext(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
select {
case out <- n * n:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
func filterWithContext(ctx context.Context, in <-chan int, threshold int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
if n > threshold {
select {
case out <- n:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := generatorWithContext(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := squareWithContext(ctx, nums)
filtered := filterWithContext(ctx, squared, 50)
for result := range filtered {
fmt.Println(result)
}
}
---
課題2: Fan-out / Fan-in パターン
基本解法
package main
import (
"fmt"
"sync"
"time"
)
// ジョブ定義
type Job struct {
ID int
Data string
}
// 結果定義
type Result struct {
JobID int
Value int
}
// ワーカー関数
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// 重い処理をシミュレート
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Value: len(job.Data) * 2,
}
}
}
func main() {
const numJobs = 20
const numWorkers = 5
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
// Fan-out: ワーカーを起動
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// ジョブを送信
go func() {
for i := 1; i <= numJobs; i++ {
jobs <- Job{
ID: i,
Data: fmt.Sprintf("Task-%d", i),
}
}
close(jobs)
}()
// ワーカーの完了を待ってから results をクローズ
go func() {
wg.Wait()
close(results)
}()
// Fan-in: 結果を集約
for result := range results {
fmt.Printf("Job %d: Value = %d\n", result.JobID, result.Value)
}
}
解法2: エラーハンドリング付き
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Value int
Err error
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// エラーをシミュレート(10%の確率)
if job.ID%10 == 0 {
results <- Result{
JobID: job.ID,
Err: errors.New("処理エラー"),
}
continue
}
time.Sleep(50 * time.Millisecond)
results <- Result{
JobID: job.ID,
Value: len(job.Data) * 2,
}
}
}
func main() {
const numJobs = 20
const numWorkers = 5
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
go func() {
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("Task-%d", i)}
}
close(jobs)
}()
go func() {
wg.Wait()
close(results)
}()
// 結果処理(エラーカウント)
successCount := 0
errorCount := 0
for result := range results {
if result.Err != nil {
errorCount++
fmt.Printf("Job %d: エラー - %v\n", result.JobID, result.Err)
} else {
successCount++
fmt.Printf("Job %d: 成功 - Value = %d\n", result.JobID, result.Value)
}
}
fmt.Printf("\n処理完了: 成功 %d, エラー %d\n", successCount, errorCount)
}
---
課題3: Worker Pool パターン
基本解法
package main
import (
"fmt"
"time"
)
type Task struct {
ID int
Name string
}
type TaskResult struct {
Task Task
Status string
}
// ワーカープール実装
type WorkerPool struct {
tasks chan Task
results chan TaskResult
workers int
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, 100),
results: make(chan TaskResult, 100),
workers: workers,
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
for task := range wp.tasks {
// タスク処理
time.Sleep(100 * time.Millisecond)
wp.results <- TaskResult{
Task: task,
Status: fmt.Sprintf("Worker %d が処理完了", id),
}
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.tasks <- task
}
func (wp *WorkerPool) Close() {
close(wp.tasks)
}
func (wp *WorkerPool) Results() <-chan TaskResult {
return wp.results
}
func main() {
pool := NewWorkerPool(5)
pool.Start()
// タスクを送信
go func() {
for i := 1; i <= 20; i++ {
pool.Submit(Task{
ID: i,
Name: fmt.Sprintf("Task-%d", i),
})
}
pool.Close()
}()
// 結果を受信
count := 0
for result := range pool.Results() {
fmt.Printf("[%s] %s\n", result.Task.Name, result.Status)
count++
if count == 20 {
break
}
}
}
解法2: グレースフルシャットダウン対応版
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Name string
}
type TaskResult struct {
Task Task
Status string
Err error
}
type WorkerPool struct {
tasks chan Task
results chan TaskResult
numWorkers int
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, 100),
results: make(chan TaskResult, 100),
numWorkers: numWorkers,
}
}
func (wp *WorkerPool) Start(ctx context.Context) {
for i := 0; i < wp.numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(ctx, i)
}
// すべてのワーカーが終了したら results をクローズ
go func() {
wp.wg.Wait()
close(wp.results)
}()
}
func (wp *WorkerPool) worker(ctx context.Context, id int) {
defer wp.wg.Done()
for {
select {
case task, ok := <-wp.tasks:
if !ok {
fmt.Printf("Worker %d: タスクチャネルがクローズされました\n", id)
return
}
wp.processTask(ctx, id, task)
case <-ctx.Done():
fmt.Printf("Worker %d: キャンセルされました\n", id)
return
}
}
}
func (wp *WorkerPool) processTask(ctx context.Context, workerID int, task Task) {
// タイムアウト付き処理
processDone := make(chan struct{})
go func() {
time.Sleep(100 * time.Millisecond)
close(processDone)
}()
select {
case <-processDone:
wp.results <- TaskResult{
Task: task,
Status: fmt.Sprintf("Worker %d が処理完了", workerID),
}
case <-ctx.Done():
wp.results <- TaskResult{
Task: task,
Err: ctx.Err(),
}
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.tasks <- task
}
func (wp *WorkerPool) Close() {
close(wp.tasks)
}
func (wp *WorkerPool) Results() <-chan TaskResult {
return wp.results
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pool := NewWorkerPool(5)
pool.Start(ctx)
// タスクを送信
go func() {
for i := 1; i <= 20; i++ {
pool.Submit(Task{
ID: i,
Name: fmt.Sprintf("Task-%d", i),
})
}
pool.Close()
}()
// 結果を受信
for result := range pool.Results() {
if result.Err != nil {
fmt.Printf("[%s] エラー: %v\n", result.Task.Name, result.Err)
} else {
fmt.Printf("[%s] %s\n", result.Task.Name, result.Status)
}
}
fmt.Println("すべてのタスクが完了しました")
}
---
課題4: Semaphore パターン(レート制限)
基本解法
package main
import (
"fmt"
"time"
)
// セマフォ実装
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(maxConcurrent int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrent),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
func main() {
const maxConcurrent = 3
sem := NewSemaphore(maxConcurrent)
for i := 1; i <= 10; i++ {
sem.Acquire()
go func(id int) {
defer sem.Release()
fmt.Printf("タスク %d 開始\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("タスク %d 完了\n", id)
}(i)
}
// すべてのタスクが完了するまで待つ
time.Sleep(10 * time.Second)
}
解法2: context.Context 統合版
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(maxConcurrent int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrent),
}
}
func (s *Semaphore) AcquireWithContext(ctx context.Context) error {
select {
case s.sem <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s *Semaphore) Release() {
<-s.sem
}
func processTask(ctx context.Context, id int, sem *Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
// セマフォ取得
if err := sem.AcquireWithContext(ctx); err != nil {
fmt.Printf("タスク %d: セマフォ取得失敗 - %v\n", id, err)
return
}
defer sem.Release()
fmt.Printf("タスク %d 開始\n", id)
// 処理(キャンセル可能)
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
select {
case <-timer.C:
fmt.Printf("タスク %d 完了\n", id)
case <-ctx.Done():
fmt.Printf("タスク %d キャンセル\n", id)
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()
const maxConcurrent = 3
sem := NewSemaphore(maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go processTask(ctx, i, sem, &wg)
}
wg.Wait()
fmt.Println("すべてのタスクが完了しました")
}
---
課題5: タイムアウト付き select
基本解法
package main
import (
"fmt"
"time"
)
func fetchData(url string, ch chan<- string) {
// 外部API呼び出しをシミュレート
time.Sleep(3 * time.Second)
ch <- fmt.Sprintf("Data from %s", url)
}
func main() {
ch := make(chan string)
go fetchData("https://api.example.com", ch)
select {
case data := <-ch:
fmt.Println("成功:", data)
case <-time.After(2 * time.Second):
fmt.Println("タイムアウト: APIレスポンスが遅すぎます")
}
}
解法2: 複数API並行呼び出し版
package main
import (
"fmt"
"time"
)
type APIResponse struct {
URL string
Data string
Err error
}
func fetchDataWithTimeout(url string, timeout time.Duration) (string, error) {
ch := make(chan APIResponse, 1)
go func() {
// API呼び出しシミュレート(ランダムな遅延)
delay := time.Duration(1+url[len(url)-1]%3) * time.Second
time.Sleep(delay)
ch <- APIResponse{
URL: url,
Data: fmt.Sprintf("Response from %s", url),
}
}()
select {
case resp := <-ch:
return resp.Data, resp.Err
case <-time.After(timeout):
return "", fmt.Errorf("timeout after %v", timeout)
}
}
func main() {
urls := []string{
"https://api1.example.com",
"https://api2.example.com",
"https://api3.example.com",
}
type result struct {
url string
data string
err error
}
results := make(chan result, len(urls))
// 並行API呼び出し
for _, url := range urls {
go func(u string) {
data, err := fetchDataWithTimeout(u, 2*time.Second)
results <- result{url: u, data: data, err: err}
}(url)
}
// 結果を収集
for i := 0; i < len(urls); i++ {
res := <-results
if res.err != nil {
fmt.Printf("[%s] エラー: %v\n", res.url, res.err)
} else {
fmt.Printf("[%s] 成功: %s\n", res.url, res.data)
}
}
}
---
課題6: Done チャネルパターン
基本解法
package main
import (
"fmt"
"time"
)
func worker(done <-chan struct{}, name string) {
for {
select {
case <-done:
fmt.Printf("%s: 終了します\n", name)
return
default:
fmt.Printf("%s: 作業中...\n", name)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
done := make(chan struct{})
go worker(done, "Worker 1")
go worker(done, "Worker 2")
go worker(done, "Worker 3")
time.Sleep(3 * time.Second)
fmt.Println("シャットダウン開始...")
close(done) // すべてのワーカーに終了を通知
time.Sleep(time.Second)
fmt.Println("シャットダウン完了")
}
解法2: グレースフルシャットダウン with WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(done <-chan struct{}, name string, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-done:
fmt.Printf("%s: クリーンアップ処理中...\n", name)
time.Sleep(500 * time.Millisecond) // クリーンアップ
fmt.Printf("%s: 終了しました\n", name)
return
default:
fmt.Printf("%s: 作業中...\n", name)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
done := make(chan struct{})
var wg sync.WaitGroup
workers := []string{"Worker 1", "Worker 2", "Worker 3"}
for _, name := range workers {
wg.Add(1)
go worker(done, name, &wg)
}
time.Sleep(3 * time.Second)
fmt.Println("シャットダウン開始...")
close(done)
// すべてのワーカーが終了するまで待つ
wg.Wait()
fmt.Println("すべてのワーカーが正常に終了しました")
}
---
ベストプラクティスまとめ
1. チャネルの方向性を明示する
// 良い例
func producer(out chan<- int) { // 送信専用
out <- 42
}
func consumer(in <-chan int) { // 受信専用
val := <-in
}
2. 送信側でクローズする
// 良い例
func generateNumbers(max int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // 必ずクローズ
for i := 0; i < max; i++ {
out <- i
}
}()
return out
}
3. context でキャンセル可能にする
// 良い例
func workWithContext(ctx context.Context) error {
resultCh := make(chan int)
go func() {
// 重い処理
time.Sleep(5 * time.Second)
resultCh <- 42
}()
select {
case result := <-resultCh:
fmt.Println("結果:", result)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
4. バッファサイズは慎重に選択
// 悪い例
ch := make(chan int, 10000) // 過剰なバッファ
// 良い例
ch := make(chan int, numWorkers) // ワーカー数に基づく
5. エラーハンドリングを忘れない
type Result struct {
Value int
Err error
}
func processWithError(data int) Result {
if data < 0 {
return Result{Err: errors.New("負の値は処理できません")}
}
return Result{Value: data * 2}
}
---
パフォーマンス最適化のヒント
1. チャネルのバッファリング
// 遅い
unbuffered := make(chan int)
// 速い(適切なサイズ)
buffered := make(chan int, 100)
2. ゴルーチンプール
// 悪い例: ゴルーチンを大量作成
for i := 0; i < 10000; i++ {
go process(i)
}
// 良い例: ワーカープール
for i := 0; i < numCPU; i++ {
go worker(jobs, results)
}
3. select のデフォルトケース
// 非ブロッキング受信
select {
case data := <-ch:
process(data)
default:
// チャネルが空の場合の処理
}
---
まとめ
今日学んだ解法パターン:
- Pipeline: データ変換の連鎖
- Fan-out/Fan-in: 並行処理と集約
- Worker Pool: リソース制限付き並行処理
- Semaphore: 同時実行数の制御
- Timeout: タイムアウト制御
- Done Channel: グレースフルシャットダウン
すべてのパターンに共通する重要なポイント:
- チャネルは送信側でクローズ
- context でキャンセル可能に
- エラーハンドリングを忘れない
- リソースリークに注意
次のDay 6では、これらのパターンを組み合わせた高度な実装を学びます。