Day 5: 並行処理入門 - 解説

Rustの並行処理が安全な理由

1. 型システムによる保証

Rustは型システムを使って、並行処理のバグをコンパイル時に防ぎます。他の言語では実行時にしか発見できないバグを、コードを書いている段階で検出できます。

// ❌ これはコンパイルエラー
let mut data = vec![1, 2, 3];

thread::spawn(|| {
    data.push(4);  // エラー: dataの所有権がない
});

data.push(5);  // エラー: 並行アクセスの可能性

コンパイラのエラーメッセージ:

error[E0373]: closure may outlive the current function
  |
  | thread::spawn(|| {
  |               ^^ may outlive borrowed value `data`

このエラーは「スレッドがdataより長生きする可能性があり、dangling pointerになる」と教えてくれます。

2. Send と Sync の深堀り

Send トレイト

定義: T: Send は「型Tの値の所有権をスレッド間で安全に移動できる」ことを意味します。

// Sendの実装を確認
fn assert_send<T: Send>() {}

fn main() {
    assert_send::<i32>();           // ✅ OK
    assert_send::<String>();        // ✅ OK
    assert_send::<Vec<i32>>();      // ✅ OK
    assert_send::<Arc<i32>>();      // ✅ OK

    // assert_send::<Rc<i32>>();    // ❌ エラー: Rc は Send ではない
    // assert_send::<*const i32>(); // ❌ エラー: 生ポインタは Send ではない
}

なぜRcはSendでないか?

Rcの参照カウントはアトミック操作ではないため、複数スレッドで同時に増減すると、データ競合が発生します:

スレッド1                  スレッド2
  │                          │
  ├─ count を読む (2)        │
  │                          ├─ count を読む (2)
  ├─ count + 1 = 3           │
  │                          ├─ count + 1 = 3
  ├─ count に 3 を書く       │
  │                          ├─ count に 3 を書く
  ↓                          ↓
  count = 3 (期待値: 4)

Sync トレイト

定義: T: Sync は「型Tの値への参照を複数スレッドから安全に共有できる」ことを意味します。

数学的に: T: Sync ⇔ &T: Send

// Syncの実装を確認
fn assert_sync<T: Sync>() {}

fn main() {
    assert_sync::<i32>();              // ✅ OK
    assert_sync::<Arc<i32>>();         // ✅ OK
    assert_sync::<Mutex<i32>>();       // ✅ OK

    // assert_sync::<Cell<i32>>();     // ❌ エラー: Cell は Sync ではない
    // assert_sync::<RefCell<i32>>();  // ❌ エラー: RefCell は Sync ではない
}

なぜCellはSyncでないか?

Cellは内部可変性を提供しますが、スレッドセーフではありません:

use std::cell::Cell;

let cell = Cell::new(0);

// ❌ これはコンパイルエラー
thread::spawn(|| {
    cell.set(10);  // エラー: Cell<i32> は Sync ではない
});

3. 所有権システムとライフタイム

Rustの所有権システムは、並行処理でも一貫して機能します:

let data = vec![1, 2, 3];

// moveキーワードで所有権をクロージャに移動
let handle = thread::spawn(move || {
    println!("{:?}", data);  // ✅ OK: 所有権を持っている
});

// println!("{:?}", data);  // ❌ エラー: 所有権は移動済み

handle.join().unwrap();

ライフタイムの視覚化:

時間軸 →

main関数:     ├──────────────┤
              │              │
data:         ├──────┤       │
                     ↓ move  │
子スレッド:          ├───────┤
                     │       │
data(moved):         ├───────┤

Arc と Rc の違い

メモリレイアウトの比較

Rc<T>:
┌─────────────────┐
│ データ          │
│ 参照カウント    │ ← 非アトミック(通常の整数)
│ 弱参照カウント  │
└─────────────────┘

Arc<T>:
┌─────────────────┐
│ データ          │
│ 参照カウント    │ ← アトミック(CPU命令レベルで同期)
│ 弱参照カウント  │
└─────────────────┘

パフォーマンス比較

use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;

fn benchmark_rc() {
    let rc = Rc::new(42);
    let start = Instant::now();

    for _ in 0..1_000_000 {
        let _clone = Rc::clone(&rc);
    }

    println!("Rc: {:?}", start.elapsed());
}

fn benchmark_arc() {
    let arc = Arc::new(42);
    let start = Instant::now();

    for _ in 0..1_000_000 {
        let _clone = Arc::clone(&arc);
    }

    println!("Arc: {:?}", start.elapsed());
}

典型的な結果:

Rc:  15ms   (高速)
Arc: 45ms   (やや遅い - アトミック操作のコスト)

使い分けの原則:

  • シングルスレッド → Rc(高速)
  • マルチスレッド → Arc(安全)
  • Mutex の内部動作

    Mutexのロックメカニズム

    Mutexは2つの役割を果たします:

  • 排他制御: 同時に1つのスレッドだけがアクセス
  • 内部可変性: イミュータブルな参照から値を変更

use std::sync::Mutex;

let mutex = Mutex::new(0);

// lock() は MutexGuard<T> を返す
let mut guard = mutex.lock().unwrap();

// MutexGuard は Deref と DerefMut を実装
*guard += 1;

// スコープを抜けると自動的にロック解放(Drop実装)

Mutexの状態遷移図

初期状態(ロック解放)
     │
     │ lock()
     ↓
ロック取得 ────── 他のスレッドは待機
     │           (ブロッキング)
     │ drop(guard)
     ↓
ロック解放 ────── 待機中のスレッドが取得

MutexGuard の設計

MutexGuardRAIIパターン(Resource Acquisition Is Initialization)の好例です:

pub struct MutexGuard<'a, T> {
    mutex: &'a Mutex<T>,
    // ... 内部フィールド
}

impl<T> Deref for MutexGuard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T { /* ... */ }
}

impl<T> DerefMut for MutexGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T { /* ... */ }
}

impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        // ここでロックを解放
    }
}

RAII の利点:

  • panic!が発生しても自動的にロック解放
  • 明示的なunlock()呼び出し不要
  • スコープでロックの寿命を制御

デッドロックの具体例と対策

デッドロックの例:

let mutex1 = Arc::new(Mutex::new(0));
let mutex2 = Arc::new(Mutex::new(0));

let m1 = Arc::clone(&mutex1);
let m2 = Arc::clone(&mutex2);

// スレッド1
thread::spawn(move || {
    let _g1 = m1.lock().unwrap();      // mutex1 をロック
    thread::sleep(Duration::from_millis(10));
    let _g2 = m2.lock().unwrap();      // mutex2 をロック(待機)
});

// スレッド2
let _g2 = mutex2.lock().unwrap();      // mutex2 をロック
thread::sleep(Duration::from_millis(10));
let _g1 = mutex1.lock().unwrap();      // mutex1 をロック(待機)

// ⚠️ デッドロック発生

デッドロックの視覚化:

スレッド1           mutex1         mutex2          スレッド2
    │                 │              │                │
    ├─ lock() ──→   取得             │                │
    │                 │              │                │
    │                 │              │        ←─ lock() ┤
    │                 │            取得               │
    │                 │              │                │
    ├─ lock() ───────────────→   待機 ←───────── lock() ┤
                                     ↓                ↓
                               デッドロック

対策1: ロックの順序を統一

// ✅ 常に mutex1 → mutex2 の順でロック
fn safe_lock(mutex1: &Mutex<i32>, mutex2: &Mutex<i32>) {
    let _g1 = mutex1.lock().unwrap();
    let _g2 = mutex2.lock().unwrap();
    // 処理
}

対策2: try_lock を使う

// タイムアウト付きロック取得
loop {
    if let Ok(g1) = mutex1.try_lock() {
        if let Ok(g2) = mutex2.try_lock() {
            // 両方取得成功
            break;
        }
    }
    // 失敗したら少し待って再試行
    thread::sleep(Duration::from_millis(1));
}

チャネルの設計哲学

Goの影響

Rustのチャネルは、Goの「CSP(Communicating Sequential Processes)」モデルに影響を受けています:

> Goの格言: "Do not communicate by sharing memory; instead, share memory by communicating." > (メモリを共有して通信するのではなく、通信することでメモリを共有せよ)

チャネルの型

use std::sync::mpsc;

// 非同期チャネル(バッファ無制限)
let (tx, rx) = mpsc::channel();

// 同期チャネル(バッファサイズ指定)
let (tx, rx) = mpsc::sync_channel(10);  // バッファサイズ10

チャネルと所有権

チャネルの最大の利点は、所有権の移動により、データ競合を根本的に防ぐことです:

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    let data = vec![1, 2, 3];
    tx.send(data).unwrap();
    // ここで data は移動済みなので使用不可
    // println!("{:?}", data);  // ❌ エラー
});

let received = rx.recv().unwrap();
// ここで received が新しい所有者
println!("{:?}", received);  // ✅ OK

所有権の移動の視覚化:

送信側スレッド          チャネル          受信側スレッド
     │                    │                    │
  data (所有)             │                    │
     │                    │                    │
     ├─ send(data) ───→  キュー               │
     │                   [data]               │
  (所有権失う)            │                    │
     ×                    │                    │
                          │             recv() ┤
                          │                    │
                         空                 data (所有)

チャネルのパフォーマンス特性

use std::sync::mpsc;
use std::time::Instant;

fn main() {
    let (tx, rx) = mpsc::channel();

    let start = Instant::now();

    // 送信スレッド
    thread::spawn(move || {
        for i in 0..1_000_000 {
            tx.send(i).unwrap();
        }
    });

    // 受信
    let mut sum = 0;
    for _ in 0..1_000_000 {
        sum += rx.recv().unwrap();
    }

    println!("Time: {:?}", start.elapsed());
    println!("Sum: {}", sum);
}

典型的な結果: 100万メッセージを約200ms で送受信(1メッセージあたり200ns)

パターン選択のガイドライン

Arc + Mutex vs チャネル

基準 Arc + Mutex チャネル
**使用例** 共有状態、キャッシュ プロデューサー/コンシューマー
**データフロー** 双方向 単方向
**結合度** 密結合 疎結合
**パフォーマンス** ロック競合あり コピーのオーバーヘッド
**デバッグ** デッドロックのリスク 比較的安全

決定木

データを共有したい
    │
    ├─ 読み取りのみ?
    │   └─ Yes → Arc<T>
    │
    ├─ 書き込みあり?
    │   ├─ 頻繁な更新?
    │   │   └─ Yes → Arc<Mutex<T>>
    │   │
    │   └─ 読み取りが多い?
    │       └─ Yes → Arc<RwLock<T>>
    │
    └─ データを渡したい?
        └─ チャネル(mpsc::channel)

メンタルモデル:並行処理の考え方

1. 所有権ベースの思考

従来の言語(C/Java/Python):

すべてのスレッドがすべてのメモリにアクセス可能
     ↓
開発者が手動で同期を管理
     ↓
バグが実行時に発見される

Rust:

所有権システムがアクセスを制限
     ↓
コンパイラが同期の必要性を検出
     ↓
バグがコンパイル時に発見される

2. 並行処理のレイヤー

レイヤー4: アプリケーションロジック
           ├─ ビジネスルール
           └─ データ処理

レイヤー3: 並行処理パターン
           ├─ チャネル
           ├─ Arc + Mutex
           └─ 非同期 (async/await)

レイヤー2: 同期プリミティブ
           ├─ Mutex, RwLock
           ├─ Atomic
           └─ Condvar

レイヤー1: OSスレッド / 非同期ランタイム
           ├─ std::thread
           └─ tokio::task

Rustの美しさは、レイヤー3以上で作業すれば、低レベルの詳細を気にしなくて良いことです。

3. Arcの参照カウント視覚化

let arc1 = Arc::new(42);     // count: 1
let arc2 = Arc::clone(&arc1); // count: 2
let arc3 = Arc::clone(&arc1); // count: 3

// メモリの状態:
//
//  ┌──────────────────┐
//  │ データ: 42       │
//  │ strong_count: 3  │
//  │ weak_count: 0    │
//  └──────────────────┘
//       ↑   ↑   ↑
//       │   │   │
//     arc1 arc2 arc3

drop(arc2);  // count: 2

// メモリの状態:
//  ┌──────────────────┐
//  │ データ: 42       │
//  │ strong_count: 2  │ ← 減った
//  │ weak_count: 0    │
//  └──────────────────┘
//       ↑   ↑
//       │   │
//     arc1 arc3

実践的な設計パターン

パターン1: ワーカープール

use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;

struct WorkerPool {
    workers: Vec<thread::JoinHandle<()>>,
    sender: mpsc::Sender<Task>,
}

type Task = Box<dyn FnOnce() + Send + 'static>;

impl WorkerPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let workers = (0..size)
            .map(|id| {
                let receiver = Arc::clone(&receiver);
                thread::spawn(move || loop {
                    let task = receiver.lock().unwrap().recv();
                    match task {
                        Ok(task) => {
                            println!("Worker {} executing task", id);
                            task();
                        }
                        Err(_) => break,
                    }
                })
            })
            .collect();

        WorkerPool { workers, sender }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.sender.send(Box::new(f)).unwrap();
    }
}

パターン2: Actor モデル

use std::sync::mpsc;
use std::thread;

struct Actor {
    sender: mpsc::Sender<Message>,
}

enum Message {
    Increment,
    Decrement,
    GetValue(mpsc::Sender<i32>),
}

impl Actor {
    fn new() -> Self {
        let (tx, rx) = mpsc::channel();

        thread::spawn(move || {
            let mut value = 0;
            for msg in rx {
                match msg {
                    Message::Increment => value += 1,
                    Message::Decrement => value -= 1,
                    Message::GetValue(reply) => {
                        reply.send(value).unwrap();
                    }
                }
            }
        });

        Actor { sender: tx }
    }

    fn increment(&self) {
        self.sender.send(Message::Increment).unwrap();
    }

    fn get_value(&self) -> i32 {
        let (tx, rx) = mpsc::channel();
        self.sender.send(Message::GetValue(tx)).unwrap();
        rx.recv().unwrap()
    }
}

セルフチェック質問

理解度を確認するための質問です:

基礎レベル

  • Q: Arc::clone(&arc) は何をしますか?
A: データをコピーせず、参照カウントをアトミックに増やします。

  • Q: Mutex::lock() の戻り値の型は?
A: Result, PoisonError> です。

  • Q: なぜ Rc はスレッド間で共有できないのですか?
A: 参照カウントがアトミックでないため、データ競合が発生する可能性があります。

中級レベル

  • Q: Arc>>Mutex>> の違いは?
A: 前者は複数スレッドでVecを共有+変更可能。後者はMutexがArcを保護(通常は意味がない)。

  • Q: チャネルでtxをdropしないとどうなりますか?
A: rxが永遠に待ち続け、プログラムがハングします。

  • Q: SendSync の関係は?
A: T: Sync&T: Send。型がSyncなら、その参照はSend。

上級レベル

  • Q: なぜ Arc>Clone を実装していますか?
A: ArcClone を実装し、MutexClone は不要(Arc経由で共有)。

  • Q: デッドロックを検出する方法は?
A: try_lock() を使うか、ロックの順序を統一。または parking_lot クレートのデッドロック検出機能。

  • Q: Mutex より RwLock が遅くなる場合は?
A: 書き込みが頻繁な場合。RwLock は読み取り/書き込みの調整にオーバーヘッドがあります。

次のステップ

Day 5の課題を完了したら:

  • 復習: 3つのExerciseを見直し、パターンを理解
  • 応用: 自分のプロジェクトで並行処理を使ってみる
  • 準備: Day 6のキャップストーンで全概念を統合

参考資料

深掘り学習

ビデオ

Day 6で、これまでの全てを統合したプロジェクトに挑戦しましょう!