第14章: チャネル
学習目標
この章を終えると、以下ができるようになります:
- チャネルの概念と使い方を理解できる
- バッファ付き/なしチャネルの違いを説明できる
- select文で複数のチャネル操作を制御できる
- チャネルを使ったパイプラインパターンを実装できる
- hchan構造体の内部実装を理解できる
チャネルとは
基本概念
チャネルは、goroutine間で値を送受信するための型付きパイプです。
Rob Pikeの言葉: > "Don't communicate by sharing memory; share memory by communicating." > (メモリを共有して通信するな。通信してメモリを共有せよ。)
package main
import "fmt"
func main() {
// チャネルの作成
ch := make(chan int)
// goroutineで値を送信
go func() {
ch <- 42 // チャネルに値を送信
}()
// 値を受信
value := <-ch // チャネルから値を受信
fmt.Println(value) // 42
}
チャネルの型
var ch chan int // int型のチャネル
var strCh chan string // string型のチャネル
var structCh chan Person // 構造体型のチャネル
送受信の構文
ch <- value // 送信: chに値を送る
value := <-ch // 受信: chから値を受け取る
value, ok := <-ch // 受信(チャネルが閉じられているかチェック)
🔑 hchan:チャネルの内部構造
チャネルはruntime.hchan構造体として実装されています。
// runtime/chan.go (簡略版)
type hchan struct {
qcount uint // キュー内の要素数
dataqsiz uint // バッファサイズ
buf unsafe.Pointer // バッファへのポインタ
elemsize uint16 // 要素のサイズ
closed uint32 // クローズフラグ
elemtype *_type // 要素の型情報
sendx uint // 送信インデックス
recvx uint // 受信インデックス
recvq waitq // 受信待ちgoroutineのキュー
sendq waitq // 送信待ちgoroutineのキュー
lock mutex // 排他制御用mutex
}
type waitq struct {
first *sudog // 待機キューの先頭
last *sudog // 待機キューの末尾
}
type sudog struct {
g *g // 待機中のgoroutine
elem unsafe.Pointer // データ要素へのポインタ
next *sudog // 次の待機者
prev *sudog // 前の待機者
// その他多数のフィールド...
}
hchanの視覚化
hchan構造体:
┌─────────────────────────────────────┐
│ qcount: 3 ← 現在の要素数│
│ dataqsiz: 5 ← バッファ容量│
│ buf: 0x... ──────┐ │
│ elemsize: 8 │ │
│ closed: 0 │ │
│ sendx: 3 │ │
│ recvx: 0 │ │
│ recvq: nil │ │
│ sendq: nil │ │
│ lock: mutex │ │
└───────────────────────┼────────────┘
│
↓
buf (リングバッファ):
┌───┬───┬───┬───┬───┐
recvx → │ 1 │ 2 │ 3 │ │ │
└───┴───┴───┴───┴───┘
↑
sendx
バッファなしチャネル
同期的な通信
バッファなしチャネルは、送信側と受信側が両方準備できるまでブロックします。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string) // バッファなしチャネル
go func() {
fmt.Println("送信前")
ch <- "Hello" // 受信側が準備できるまでブロック
fmt.Println("送信後")
}()
time.Sleep(2 * time.Second)
fmt.Println("受信前")
msg := <-ch // 送信側が準備できるまでブロック
fmt.Println("受信後:", msg)
}
出力:
送信前
(2秒待機)
受信前
送信後
受信後: Hello
🔑 バッファなしチャネルの送信処理
送信側goroutine (G1):
1. ch <- value
↓
2. hchan.lockを取得
↓
3. recvq(受信待ちキュー)をチェック
├─ 受信待ちあり → 直接渡して完了
└─ 受信待ちなし → sendqに自分を追加してブロック
↓
4. 受信側が現れるまで待機(gopark)
↓
5. 受信側から起床(goready)
↓
6. 送信完了
詳細な流れ(疑似コード):
// runtime/chan.go (簡略版)
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 1. nilチャネルチェック
if c == nil {
if !block {
return false
}
gopark(...) // 永遠にブロック
}
// 2. ロック取得
lock(&c.lock)
// 3. クローズチェック
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 4. 受信待ちgoroutineがあるか?
if sg := c.recvq.dequeue(); sg != nil {
// 直接渡す(バッファなしチャネルの高速パス)
send(c, sg, ep)
unlock(&c.lock)
return true
}
// 5. バッファに空きがあるか?
if c.qcount < c.dataqsiz {
// バッファに書き込む
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0 // リングバッファ
}
c.qcount++
unlock(&c.lock)
return true
}
// 6. ブロックしない場合は失敗
if !block {
unlock(&c.lock)
return false
}
// 7. 送信待ちキューに追加
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
c.sendq.enqueue(mysg)
// 8. ブロック(パーク)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), ...)
// 9. 起床後
releaseSudog(mysg)
return true
}
バッファ付きチャネル
非同期的な通信
バッファ付きチャネルは、バッファが満杯でない限り送信をブロックしません。
package main
import "fmt"
func main() {
ch := make(chan int, 3) // バッファサイズ3
// 送信側(ブロックしない)
ch <- 1
ch <- 2
ch <- 3
fmt.Println("3つの値を送信しました")
// 受信側
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}
🔑 バッファ付きチャネルの内部動作
初期状態(バッファサイズ3):
┌───┬───┬───┐
│ │ │ │ qcount: 0
└───┴───┴───┘
↑ ↑
recvx sendx
ch <- 1 送信後:
┌───┬───┬───┐
│ 1 │ │ │ qcount: 1
└───┴───┴───┘
↑ ↑
r s
ch <- 2, ch <- 3 送信後:
┌───┬───┬───┐
│ 1 │ 2 │ 3 │ qcount: 3 (満杯)
└───┴───┴───┘
↑ ↑
r s
<-ch 受信後:
┌───┬───┬───┐
│ │ 2 │ 3 │ qcount: 2
└───┴───┴───┘
↑ ↑
r s
さらに ch <- 4 送信:
┌───┬───┬───┐
│ 4 │ 2 │ 3 │ qcount: 3
└───┴───┴───┘
↑ ↑
r s (リングバッファ)
バッファの容量と長さ
ch := make(chan int, 5)
ch <- 1
ch <- 2
fmt.Println("容量:", cap(ch)) // 5
fmt.Println("長さ:", len(ch)) // 2(現在の要素数)
🔑 チャネルの受信処理
// runtime/chan.go (簡略版)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. nilチャネルチェック
if c == nil {
if !block {
return false, false
}
gopark(...) // 永遠にブロック
}
// 2. ロック取得
lock(&c.lock)
// 3. クローズ&空チェック
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // ゼロ値
}
return true, false
}
// 4. 送信待ちgoroutineがあるか?
if sg := c.sendq.dequeue(); sg != nil {
// バッファなし または バッファ満杯の場合
recv(c, sg, ep)
unlock(&c.lock)
return true, true
}
// 5. バッファに要素があるか?
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 6. ブロックしない場合は失敗
if !block {
unlock(&c.lock)
return false, false
}
// 7. 受信待ちキューに追加
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
c.recvq.enqueue(mysg)
// 8. ブロック(パーク)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), ...)
// 9. 起床後
releaseSudog(mysg)
return true, true
}
チャネルの閉じ方
close関数
送信側がチャネルを閉じることで、受信側に「もう値が来ない」と伝えます。
package main
import "fmt"
func producer(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch) // チャネルを閉じる
}
func main() {
ch := make(chan int)
go producer(ch)
// rangeで受信(チャネルが閉じられるまで)
for value := range ch {
fmt.Println(value)
}
fmt.Println("チャネルが閉じられました")
}
🔑 close()の内部実装
// runtime/chan.go (簡略版)
func closechan(c *hchan) {
// 1. nilチャネルチェック
if c == nil {
panic("close of nil channel")
}
// 2. ロック取得
lock(&c.lock)
// 3. 二重クローズチェック
if c.closed != 0 {
unlock(&c.lock)
panic("close of closed channel")
}
// 4. クローズフラグを設定
c.closed = 1
var glist gList
// 5. すべての受信待ちgoroutineを起床
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // ゼロ値
sg.elem = nil
}
glist.push(sg.g)
}
// 6. すべての送信待ちgoroutineを起床(panicする)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
glist.push(sg.g)
}
unlock(&c.lock)
// 7. すべての待機goroutineを起床
for !glist.empty() {
gp := glist.pop()
goready(gp, 3)
}
}
閉じられたチャネルの挙動
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
// 受信は可能(バッファ内の値を取得)
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
// さらに受信するとゼロ値が返る
value, ok := <-ch
fmt.Println(value, ok) // 0 false
// 送信するとパニック
// ch <- 3 // panic: send on closed channel
方向付きチャネル
送信専用と受信専用
// 送信専用チャネル
func producer(ch chan<- int) {
ch <- 42
// value := <-ch // コンパイルエラー
}
// 受信専用チャネル
func consumer(ch <-chan int) {
value := <-ch
fmt.Println(value)
// ch <- 100 // コンパイルエラー
}
func main() {
ch := make(chan int) // 双方向チャネル
go producer(ch) // 送信専用として渡す
consumer(ch) // 受信専用として渡す
}
型安全性の向上
package main
import "fmt"
func generateNumbers(out chan<- int) {
for i := 1; i <= 5; i++ {
out <- i
}
close(out)
}
func squareNumbers(in <-chan int, out chan<- int) {
for num := range in {
out <- num * num
}
close(out)
}
func main() {
nums := make(chan int)
squares := make(chan int)
go generateNumbers(nums)
go squareNumbers(nums, squares)
for square := range squares {
fmt.Println(square)
}
}
select文
複数チャネルの操作
select文は、複数のチャネル操作から1つを選択します。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("受信:", msg1)
case msg2 := <-ch2:
fmt.Println("受信:", msg2)
}
}
}
🔑 selectの内部実装
selectは非常に複雑な実装になっています。基本的な流れ:
// runtime/select.go (簡略版)
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
// 1. ケースの配列を準備
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
// 2. ポーリング順序とロック順序をシャッフル
for i := 0; i < ncases; i++ {
pollorder[i] = uint16(i)
}
// Fisher-Yates shuffle
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i], pollorder[j] = pollorder[j], pollorder[i]
}
// 3. ロック順序を決定(デッドロック回避)
for i := 0; i < ncases; i++ {
lockorder[i] = pollorder[i]
}
// ソート
// 4. すべてのチャネルをロック
sellock(scases, lockorder)
// 5. 即座に実行可能なケースを探す
for _, casei := range pollorder {
cas := &scases[casei]
c := cas.c
if cas.kind == caseRecv {
if c.qcount > 0 || c.sendq.first != nil {
goto recv
}
} else if cas.kind == caseSend {
if c.closed != 0 {
goto sclose
}
if c.qcount < c.dataqsiz || c.recvq.first != nil {
goto send
}
} else { // caseDefault
selunlock(scases, lockorder)
return casei, false
}
}
// 6. すべてのチャネルに自分を登録
gp := getg()
for _, casei := range lockorder {
cas := &scases[casei]
c := cas.c
sg := acquireSudog()
sg.g = gp
sg.c = c
if cas.kind == caseRecv {
c.recvq.enqueue(sg)
} else {
c.sendq.enqueue(sg)
}
}
// 7. ブロック(パーク)
gopark(selparkcommit, nil, ...)
// 8. 起床後、選ばれたケースを処理
sellock(scases, lockorder)
// ... クリーンアップと処理
selunlock(scases, lockorder)
return casi, recvOK
recv:
// 受信処理
// ...
send:
// 送信処理
// ...
sclose:
// クローズされたチャネルへの送信
panic("send on closed channel")
}
selectの動作フロー:
1. ケースの準備
┌────────────────┐
│ case <-ch1: │
│ case ch2 <- v: │
│ default: │
└────────┬───────┘
↓
2. ランダムな順序でポーリング
┌────────────────┐
│ ケースを │
│ シャッフル │
└────────┬───────┘
↓
3. すべてのチャネルをロック(デッドロック回避のため順序付け)
┌────────────────┐
│ lockorder順に │
│ ロック取得 │
└────────┬───────┘
↓
4. 即座に実行可能なケースを探す
┌────────────────┐
│ 送受信可能? │
├─Yes──┬──No────┤
│ │ │
実行 │ すべてのチャネルに
│ 自分を登録してブロック
↓
┌────────────────┐
│ 起床後処理 │
└────────────────┘
タイムアウトの実装
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case result := <-ch:
fmt.Println("成功:", result)
case <-time.After(1 * time.Second):
fmt.Println("タイムアウト")
}
}
💡 time.After()は新しいチャネルを返します。指定時間後にそのチャネルが閉じられます。
defaultケース
defaultケースは、他のケースが準備できていない場合に即座に実行されます。
package main
import "fmt"
func main() {
ch := make(chan int, 1)
select {
case ch <- 42:
fmt.Println("送信成功")
default:
fmt.Println("送信できません")
}
select {
case value := <-ch:
fmt.Println("受信:", value)
default:
fmt.Println("受信できません")
}
}
非ブロッキング操作
func tryReceive(ch <-chan int) (int, bool) {
select {
case value := <-ch:
return value, true
default:
return 0, false
}
}
func trySend(ch chan<- int, value int) bool {
select {
case ch <- value:
return true
default:
return false
}
}
パイプラインパターン
基本的なパイプライン
package main
import "fmt"
// ステージ1: 数値を生成
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// ステージ2: 2乗する
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// ステージ3: 偶数のみフィルター
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// パイプライン構築
nums := generate(1, 2, 3, 4, 5)
squares := square(nums)
evens := filterEven(squares)
// 結果の出力
for result := range evens {
fmt.Println(result) // 4, 16
}
}
🔑 パイプラインの視覚化
generate → square → filterEven → main
┌─────────┐ ┌─────────┐ ┌─────────┐
│generate │───→│ square │───→│filterEve│───→ main
└─────────┘ └─────────┘ │ n │
└─────────┘
ch1 ch2 ch3
1 ───→ 1*1=1 ─X─→ (奇数除外)
2 ───→ 2*2=4 ───→ 4 ───→ 出力
3 ───→ 3*3=9 ─X─→ (奇数除外)
4 ───→ 4*4=16 ──→ 16 ──→ 出力
5 ───→ 5*5=25 ─X─→ (奇数除外)
ファンアウト/ファンイン
package main
import (
"fmt"
"sync"
)
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
out := make(chan int)
channels[i] = out
go func(ch chan<- int) {
defer close(ch)
for num := range in {
ch <- num * num // 重い処理のシミュレーション
}
}(out)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for num := range c {
out <- num
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 入力チャネル
in := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
in <- i
}
close(in)
}()
// ファンアウト(3つのワーカーに分配)
workers := fanOut(in, 3)
// ファンイン(結果を1つのチャネルに集約)
results := fanIn(workers...)
// 結果の出力
for result := range results {
fmt.Println(result)
}
}
🔑 ファンアウト/ファンインの視覚化
ファンアウト(Fan-Out):
┌─────────┐
in ───→│ worker1 │───→ ch1
├─────────┤
───→│ worker2 │───→ ch2
├─────────┤
───→│ worker3 │───→ ch3
└─────────┘
ファンイン(Fan-In):
ch1 ───┐
├───→ out
ch2 ───┤
│
ch3 ───┘
実践例:ワーカープール
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Output string
Duration time.Duration
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
start := time.Now()
// 処理のシミュレーション
time.Sleep(100 * time.Millisecond)
output := fmt.Sprintf("Worker %d processed job %d: %s",
id, job.ID, job.Data)
results <- Result{
Job: job,
Output: output,
Duration: time.Since(start),
}
}
}
func main() {
jobs := make(chan Job, 10)
results := make(chan Result, 10)
var wg sync.WaitGroup
// 3つのワーカーを起動
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// ジョブの送信
go func() {
for i := 1; i <= 10; i++ {
jobs <- Job{
ID: i,
Data: fmt.Sprintf("data-%d", i),
}
}
close(jobs)
}()
// 結果収集用goroutine
go func() {
wg.Wait()
close(results)
}()
// 結果の出力
for result := range results {
fmt.Printf("✓ Job %d completed in %v\n",
result.Job.ID, result.Duration)
}
}
チャネルの落とし穴
落とし穴1: デッドロック
// 悪い例: デッドロック
func main() {
ch := make(chan int)
ch <- 42 // 受信側がいないのでブロック → デッドロック
fmt.Println(<-ch)
}
// 良い例: goroutineで送信
func main() {
ch := make(chan int)
go func() {
ch <- 42
}()
fmt.Println(<-ch)
}
落とし穴2: チャネルリーク
// 悪い例: チャネルリーク
func leak() <-chan int {
ch := make(chan int)
go func() {
// 重い処理
result := heavyComputation()
ch <- result // 誰も受信しないとgoroutineがリークする
}()
return ch
}
// 良い例: コンテキストでキャンセル可能
func noLeak(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
result := heavyComputation()
select {
case ch <- result:
case <-ctx.Done():
return
}
}()
return ch
}
落とし穴3: 閉じたチャネルへの送信
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel
ルール:
- チャネルを閉じるのは送信側の責任
- 受信側は閉じてはいけない
- 閉じたチャネルへの送信はパニック
⚠️ チャネルのパフォーマンス考慮事項
1. チャネルのコスト
チャネル操作のコスト:
1. ロック取得/解放: 約20ns
2. メモリコピー: 要素サイズに依存
3. goroutine切り替え: 約1μs
⇒ 頻繁な小さいデータの送受信には向かない
2. バッファサイズの選択
// 同期が必要: バッファなし
done := make(chan struct{})
// キューとして使用: 適切なバッファサイズ
jobs := make(chan Job, numWorkers*2)
// 既知のサイズ: 完全なバッファ
items := []int{1, 2, 3, 4, 5}
ch := make(chan int, len(items))
for _, item := range items {
ch <- item // ブロックしない
}
close(ch)
3. チャネル vs Mutex
// チャネルが適している場合:
// - goroutine間の通信
// - パイプライン処理
// - シグナリング
// Mutexが適している場合:
// - 単純な排他制御
// - 高頻度のアクセス
// - メモリ効率が重要
ベストプラクティス
1. 送信側がチャネルを閉じる
func producer(ch chan<- int) {
defer close(ch) // 関数終了時に閉じる
for i := 0; i < 10; i++ {
ch <- i
}
}
2. nilチャネルでselectを無効化
for {
select {
case v, ok := <-ch1:
if !ok {
ch1 = nil // このcaseを無効化
continue
}
process(v)
case v, ok := <-ch2:
if !ok {
ch2 = nil
continue
}
process(v)
}
if ch1 == nil && ch2 == nil {
break
}
}
3. チャネルのバッファサイズを適切に
// 送受信が同期的でない場合はバッファを使う
ch := make(chan Task, expectedConcurrency)
// 同期が必要な場合はバッファなし
done := make(chan struct{})
自己確認問題
まとめ
この章では、チャネルの内部構造と実装を詳しく学びました。
重要ポイント:
- hchan構造体: チャネルの内部実装、リングバッファと待機キュー
- バッファなし: 送受信の同期を保証(直接渡し)
- バッファ付き: 非同期通信(リングバッファ使用)
- select: 複数チャネルの多重化、ランダムポーリング
- パイプライン: チャネルを繋いでデータフロー構築
ベストプラクティス:
- 送信側がチャネルを閉じる
- 方向付きチャネルで型安全性を高める
- デッドロックに注意
- チャネルリークを防ぐ
- 適切なバッファサイズを選ぶ
次の章では、パッケージとモジュールの管理について、特にgo.modとMVSアルゴリズムを学びます。