rust-concurrency - 解説

実装の詳細

Arc> パターン

// Arc: 参照カウント(スレッドセーフ)
// Mutex: 排他ロック

let data = Arc::new(Mutex::new(vec![1, 2, 3]));

let data_clone = Arc::clone(&data);
thread::spawn(move || {
    let mut guard = data_clone.lock().unwrap();
    guard.push(4);
    // guard がドロップされるとロック解除
});

チャネルの種類

// mpsc: Multiple Producer, Single Consumer
let (tx, rx) = mpsc::channel();

// 複数の送信者
let tx2 = tx.clone();

// 単一の受信者
for msg in rx {
    println!("{}", msg);
}

スレッドプールのアーキテクチャ

┌─────────────┐
│ ThreadPool  │
├─────────────┤      ┌─────────────┐
│ sender ─────┼─────→│   Channel   │
│ workers[]   │      └─────┬───────┘
└─────────────┘            │
                           ▼
              ┌────────────────────────┐
              │   Mutex<Receiver>      │
              └────────────────────────┘
                    ▲    ▲    ▲
        ┌───────────┼────┼────┼───────────┐
        │           │    │    │           │
   ┌────┴────┐ ┌────┴────┐ ┌────┴────┐
   │ Worker1 │ │ Worker2 │ │ Worker3 │
   └─────────┘ └─────────┘ └─────────┘

よくある間違い

1. ロックの保持しすぎ

// 間違い: ロックを長時間保持
let mut guard = data.lock().unwrap();
expensive_computation(&guard);  // ロック保持中
drop(guard);

// 正しい: 必要な時だけロック
let value = {
    let guard = data.lock().unwrap();
    guard.clone()
};
expensive_computation(&value);  // ロック解除済み

2. デッドロック

// 間違い: ロック順序が不定
let a = Arc::new(Mutex::new(1));
let b = Arc::new(Mutex::new(2));

// スレッド1
let _a = a.lock();
let _b = b.lock();  // スレッド2がbを持っていたらデッドロック

// 正しい: 常に同じ順序
fn lock_both(a: &Mutex<i32>, b: &Mutex<i32>) -> (MutexGuard<i32>, MutexGuard<i32>) {
    let a = a.lock().unwrap();
    let b = b.lock().unwrap();
    (a, b)
}

3. チャネルの送信側をドロップしない

// 間違い: 受信ループが終了しない
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    for item in rx {  // 永遠にブロック
        process(item);
    }
});

// tx がドロップされない限り、ループは終了しない

// 正しい: 明示的にドロップ
drop(tx);  // これで rx のループが終了

パフォーマンス最適化

アトミック操作

use std::sync::atomic::{AtomicU64, Ordering};

struct AtomicCounter {
    count: AtomicU64,
}

impl AtomicCounter {
    fn new() -> Self {
        Self { count: AtomicU64::new(0) }
    }

    fn increment(&self) {
        self.count.fetch_add(1, Ordering::Relaxed);
    }

    fn get(&self) -> u64 {
        self.count.load(Ordering::Relaxed)
    }
}

RwLock(読み書きロック)

use std::sync::RwLock;

let data = RwLock::new(vec![1, 2, 3]);

// 複数スレッドが同時に読み取り可能
let read1 = data.read().unwrap();
let read2 = data.read().unwrap();

// 書き込みは排他的
let mut write = data.write().unwrap();
write.push(4);

パーキングロット

// 標準のMutexより効率的な実装
// parking_lot クレート
use parking_lot::Mutex;

let data = Mutex::new(0);
// 標準のMutexと同じ使い方
// ただしpoisonが発生しない

発展トピック

Condvar(条件変数)

use std::sync::{Condvar, Mutex};

let pair = Arc::new((Mutex::new(false), Condvar::new()));

// 待機側
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
    started = cvar.wait(started).unwrap();
}

// 通知側
let (lock, cvar) = &*pair;
*lock.lock().unwrap() = true;
cvar.notify_one();

Barrier

use std::sync::Barrier;

let barrier = Arc::new(Barrier::new(4));

for _ in 0..4 {
    let b = Arc::clone(&barrier);
    thread::spawn(move || {
        // 準備処理
        b.wait();  // 全スレッドがここに到達するまで待機
        // 本処理(全スレッドが同時に開始)
    });
}