rust-concurrency - 解説
実装の詳細
Arc> パターン
// Arc: 参照カウント(スレッドセーフ)
// Mutex: 排他ロック
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut guard = data_clone.lock().unwrap();
guard.push(4);
// guard がドロップされるとロック解除
});
チャネルの種類
// mpsc: Multiple Producer, Single Consumer
let (tx, rx) = mpsc::channel();
// 複数の送信者
let tx2 = tx.clone();
// 単一の受信者
for msg in rx {
println!("{}", msg);
}
スレッドプールのアーキテクチャ
┌─────────────┐
│ ThreadPool │
├─────────────┤ ┌─────────────┐
│ sender ─────┼─────→│ Channel │
│ workers[] │ └─────┬───────┘
└─────────────┘ │
▼
┌────────────────────────┐
│ Mutex<Receiver> │
└────────────────────────┘
▲ ▲ ▲
┌───────────┼────┼────┼───────────┐
│ │ │ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│ Worker1 │ │ Worker2 │ │ Worker3 │
└─────────┘ └─────────┘ └─────────┘
よくある間違い
1. ロックの保持しすぎ
// 間違い: ロックを長時間保持
let mut guard = data.lock().unwrap();
expensive_computation(&guard); // ロック保持中
drop(guard);
// 正しい: 必要な時だけロック
let value = {
let guard = data.lock().unwrap();
guard.clone()
};
expensive_computation(&value); // ロック解除済み
2. デッドロック
// 間違い: ロック順序が不定
let a = Arc::new(Mutex::new(1));
let b = Arc::new(Mutex::new(2));
// スレッド1
let _a = a.lock();
let _b = b.lock(); // スレッド2がbを持っていたらデッドロック
// 正しい: 常に同じ順序
fn lock_both(a: &Mutex<i32>, b: &Mutex<i32>) -> (MutexGuard<i32>, MutexGuard<i32>) {
let a = a.lock().unwrap();
let b = b.lock().unwrap();
(a, b)
}
3. チャネルの送信側をドロップしない
// 間違い: 受信ループが終了しない
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for item in rx { // 永遠にブロック
process(item);
}
});
// tx がドロップされない限り、ループは終了しない
// 正しい: 明示的にドロップ
drop(tx); // これで rx のループが終了
パフォーマンス最適化
アトミック操作
use std::sync::atomic::{AtomicU64, Ordering};
struct AtomicCounter {
count: AtomicU64,
}
impl AtomicCounter {
fn new() -> Self {
Self { count: AtomicU64::new(0) }
}
fn increment(&self) {
self.count.fetch_add(1, Ordering::Relaxed);
}
fn get(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
}
RwLock(読み書きロック)
use std::sync::RwLock;
let data = RwLock::new(vec![1, 2, 3]);
// 複数スレッドが同時に読み取り可能
let read1 = data.read().unwrap();
let read2 = data.read().unwrap();
// 書き込みは排他的
let mut write = data.write().unwrap();
write.push(4);
パーキングロット
// 標準のMutexより効率的な実装
// parking_lot クレート
use parking_lot::Mutex;
let data = Mutex::new(0);
// 標準のMutexと同じ使い方
// ただしpoisonが発生しない
発展トピック
Condvar(条件変数)
use std::sync::{Condvar, Mutex};
let pair = Arc::new((Mutex::new(false), Condvar::new()));
// 待機側
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
// 通知側
let (lock, cvar) = &*pair;
*lock.lock().unwrap() = true;
cvar.notify_one();
Barrier
use std::sync::Barrier;
let barrier = Arc::new(Barrier::new(4));
for _ in 0..4 {
let b = Arc::clone(&barrier);
thread::spawn(move || {
// 準備処理
b.wait(); // 全スレッドがここに到達するまで待機
// 本処理(全スレッドが同時に開始)
});
}