Day 5: 並行処理入門 - 解説
Rustの並行処理が安全な理由
1. 型システムによる保証
Rustは型システムを使って、並行処理のバグをコンパイル時に防ぎます。他の言語では実行時にしか発見できないバグを、コードを書いている段階で検出できます。
// ❌ これはコンパイルエラー
let mut data = vec![1, 2, 3];
thread::spawn(|| {
data.push(4); // エラー: dataの所有権がない
});
data.push(5); // エラー: 並行アクセスの可能性
コンパイラのエラーメッセージ:
error[E0373]: closure may outlive the current function
|
| thread::spawn(|| {
| ^^ may outlive borrowed value `data`
このエラーは「スレッドがdataより長生きする可能性があり、dangling pointerになる」と教えてくれます。
2. Send と Sync の深堀り
Send トレイト
定義: T: Send は「型Tの値の所有権をスレッド間で安全に移動できる」ことを意味します。
// Sendの実装を確認
fn assert_send<T: Send>() {}
fn main() {
assert_send::<i32>(); // ✅ OK
assert_send::<String>(); // ✅ OK
assert_send::<Vec<i32>>(); // ✅ OK
assert_send::<Arc<i32>>(); // ✅ OK
// assert_send::<Rc<i32>>(); // ❌ エラー: Rc は Send ではない
// assert_send::<*const i32>(); // ❌ エラー: 生ポインタは Send ではない
}
なぜRcはSendでないか?
Rcの参照カウントはアトミック操作ではないため、複数スレッドで同時に増減すると、データ競合が発生します:
スレッド1 スレッド2
│ │
├─ count を読む (2) │
│ ├─ count を読む (2)
├─ count + 1 = 3 │
│ ├─ count + 1 = 3
├─ count に 3 を書く │
│ ├─ count に 3 を書く
↓ ↓
count = 3 (期待値: 4)
Sync トレイト
定義: T: Sync は「型Tの値への参照を複数スレッドから安全に共有できる」ことを意味します。
数学的に: T: Sync ⇔ &T: Send
// Syncの実装を確認
fn assert_sync<T: Sync>() {}
fn main() {
assert_sync::<i32>(); // ✅ OK
assert_sync::<Arc<i32>>(); // ✅ OK
assert_sync::<Mutex<i32>>(); // ✅ OK
// assert_sync::<Cell<i32>>(); // ❌ エラー: Cell は Sync ではない
// assert_sync::<RefCell<i32>>(); // ❌ エラー: RefCell は Sync ではない
}
なぜCellはSyncでないか?
Cellは内部可変性を提供しますが、スレッドセーフではありません:
use std::cell::Cell;
let cell = Cell::new(0);
// ❌ これはコンパイルエラー
thread::spawn(|| {
cell.set(10); // エラー: Cell<i32> は Sync ではない
});
3. 所有権システムとライフタイム
Rustの所有権システムは、並行処理でも一貫して機能します:
let data = vec![1, 2, 3];
// moveキーワードで所有権をクロージャに移動
let handle = thread::spawn(move || {
println!("{:?}", data); // ✅ OK: 所有権を持っている
});
// println!("{:?}", data); // ❌ エラー: 所有権は移動済み
handle.join().unwrap();
ライフタイムの視覚化:
時間軸 →
main関数: ├──────────────┤
│ │
data: ├──────┤ │
↓ move │
子スレッド: ├───────┤
│ │
data(moved): ├───────┤
Arc と Rc の違い
メモリレイアウトの比較
Rc<T>:
┌─────────────────┐
│ データ │
│ 参照カウント │ ← 非アトミック(通常の整数)
│ 弱参照カウント │
└─────────────────┘
Arc<T>:
┌─────────────────┐
│ データ │
│ 参照カウント │ ← アトミック(CPU命令レベルで同期)
│ 弱参照カウント │
└─────────────────┘
パフォーマンス比較
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;
fn benchmark_rc() {
let rc = Rc::new(42);
let start = Instant::now();
for _ in 0..1_000_000 {
let _clone = Rc::clone(&rc);
}
println!("Rc: {:?}", start.elapsed());
}
fn benchmark_arc() {
let arc = Arc::new(42);
let start = Instant::now();
for _ in 0..1_000_000 {
let _clone = Arc::clone(&arc);
}
println!("Arc: {:?}", start.elapsed());
}
典型的な結果:
Rc: 15ms (高速)
Arc: 45ms (やや遅い - アトミック操作のコスト)
使い分けの原則:
- シングルスレッド →
Rc(高速) - マルチスレッド →
Arc(安全) - 排他制御: 同時に1つのスレッドだけがアクセス
- 内部可変性: イミュータブルな参照から値を変更
Mutex の内部動作
Mutexのロックメカニズム
Mutexは2つの役割を果たします:
use std::sync::Mutex;
let mutex = Mutex::new(0);
// lock() は MutexGuard<T> を返す
let mut guard = mutex.lock().unwrap();
// MutexGuard は Deref と DerefMut を実装
*guard += 1;
// スコープを抜けると自動的にロック解放(Drop実装)
Mutexの状態遷移図
初期状態(ロック解放)
│
│ lock()
↓
ロック取得 ────── 他のスレッドは待機
│ (ブロッキング)
│ drop(guard)
↓
ロック解放 ────── 待機中のスレッドが取得
MutexGuard の設計
MutexGuardはRAIIパターン(Resource Acquisition Is Initialization)の好例です:
pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
// ... 内部フィールド
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T { /* ... */ }
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T { /* ... */ }
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// ここでロックを解放
}
}
RAII の利点:
panic!が発生しても自動的にロック解放- 明示的な
unlock()呼び出し不要 - スコープでロックの寿命を制御
デッドロックの具体例と対策
デッドロックの例:
let mutex1 = Arc::new(Mutex::new(0));
let mutex2 = Arc::new(Mutex::new(0));
let m1 = Arc::clone(&mutex1);
let m2 = Arc::clone(&mutex2);
// スレッド1
thread::spawn(move || {
let _g1 = m1.lock().unwrap(); // mutex1 をロック
thread::sleep(Duration::from_millis(10));
let _g2 = m2.lock().unwrap(); // mutex2 をロック(待機)
});
// スレッド2
let _g2 = mutex2.lock().unwrap(); // mutex2 をロック
thread::sleep(Duration::from_millis(10));
let _g1 = mutex1.lock().unwrap(); // mutex1 をロック(待機)
// ⚠️ デッドロック発生
デッドロックの視覚化:
スレッド1 mutex1 mutex2 スレッド2
│ │ │ │
├─ lock() ──→ 取得 │ │
│ │ │ │
│ │ │ ←─ lock() ┤
│ │ 取得 │
│ │ │ │
├─ lock() ───────────────→ 待機 ←───────── lock() ┤
↓ ↓
デッドロック
対策1: ロックの順序を統一
// ✅ 常に mutex1 → mutex2 の順でロック
fn safe_lock(mutex1: &Mutex<i32>, mutex2: &Mutex<i32>) {
let _g1 = mutex1.lock().unwrap();
let _g2 = mutex2.lock().unwrap();
// 処理
}
対策2: try_lock を使う
// タイムアウト付きロック取得
loop {
if let Ok(g1) = mutex1.try_lock() {
if let Ok(g2) = mutex2.try_lock() {
// 両方取得成功
break;
}
}
// 失敗したら少し待って再試行
thread::sleep(Duration::from_millis(1));
}
チャネルの設計哲学
Goの影響
Rustのチャネルは、Goの「CSP(Communicating Sequential Processes)」モデルに影響を受けています:
> Goの格言: "Do not communicate by sharing memory; instead, share memory by communicating." > (メモリを共有して通信するのではなく、通信することでメモリを共有せよ)
チャネルの型
use std::sync::mpsc;
// 非同期チャネル(バッファ無制限)
let (tx, rx) = mpsc::channel();
// 同期チャネル(バッファサイズ指定)
let (tx, rx) = mpsc::sync_channel(10); // バッファサイズ10
チャネルと所有権
チャネルの最大の利点は、所有権の移動により、データ競合を根本的に防ぐことです:
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let data = vec![1, 2, 3];
tx.send(data).unwrap();
// ここで data は移動済みなので使用不可
// println!("{:?}", data); // ❌ エラー
});
let received = rx.recv().unwrap();
// ここで received が新しい所有者
println!("{:?}", received); // ✅ OK
所有権の移動の視覚化:
送信側スレッド チャネル 受信側スレッド
│ │ │
data (所有) │ │
│ │ │
├─ send(data) ───→ キュー │
│ [data] │
(所有権失う) │ │
× │ │
│ recv() ┤
│ │
空 data (所有)
チャネルのパフォーマンス特性
use std::sync::mpsc;
use std::time::Instant;
fn main() {
let (tx, rx) = mpsc::channel();
let start = Instant::now();
// 送信スレッド
thread::spawn(move || {
for i in 0..1_000_000 {
tx.send(i).unwrap();
}
});
// 受信
let mut sum = 0;
for _ in 0..1_000_000 {
sum += rx.recv().unwrap();
}
println!("Time: {:?}", start.elapsed());
println!("Sum: {}", sum);
}
典型的な結果: 100万メッセージを約200ms で送受信(1メッセージあたり200ns)
パターン選択のガイドライン
Arc + Mutex vs チャネル
| 基準 | Arc + Mutex | チャネル |
|---|---|---|
| **使用例** | 共有状態、キャッシュ | プロデューサー/コンシューマー |
| **データフロー** | 双方向 | 単方向 |
| **結合度** | 密結合 | 疎結合 |
| **パフォーマンス** | ロック競合あり | コピーのオーバーヘッド |
| **デバッグ** | デッドロックのリスク | 比較的安全 |
決定木
データを共有したい
│
├─ 読み取りのみ?
│ └─ Yes → Arc<T>
│
├─ 書き込みあり?
│ ├─ 頻繁な更新?
│ │ └─ Yes → Arc<Mutex<T>>
│ │
│ └─ 読み取りが多い?
│ └─ Yes → Arc<RwLock<T>>
│
└─ データを渡したい?
└─ チャネル(mpsc::channel)
メンタルモデル:並行処理の考え方
1. 所有権ベースの思考
従来の言語(C/Java/Python):
すべてのスレッドがすべてのメモリにアクセス可能
↓
開発者が手動で同期を管理
↓
バグが実行時に発見される
Rust:
所有権システムがアクセスを制限
↓
コンパイラが同期の必要性を検出
↓
バグがコンパイル時に発見される
2. 並行処理のレイヤー
レイヤー4: アプリケーションロジック
├─ ビジネスルール
└─ データ処理
レイヤー3: 並行処理パターン
├─ チャネル
├─ Arc + Mutex
└─ 非同期 (async/await)
レイヤー2: 同期プリミティブ
├─ Mutex, RwLock
├─ Atomic
└─ Condvar
レイヤー1: OSスレッド / 非同期ランタイム
├─ std::thread
└─ tokio::task
Rustの美しさは、レイヤー3以上で作業すれば、低レベルの詳細を気にしなくて良いことです。
3. Arcの参照カウント視覚化
let arc1 = Arc::new(42); // count: 1
let arc2 = Arc::clone(&arc1); // count: 2
let arc3 = Arc::clone(&arc1); // count: 3
// メモリの状態:
//
// ┌──────────────────┐
// │ データ: 42 │
// │ strong_count: 3 │
// │ weak_count: 0 │
// └──────────────────┘
// ↑ ↑ ↑
// │ │ │
// arc1 arc2 arc3
drop(arc2); // count: 2
// メモリの状態:
// ┌──────────────────┐
// │ データ: 42 │
// │ strong_count: 2 │ ← 減った
// │ weak_count: 0 │
// └──────────────────┘
// ↑ ↑
// │ │
// arc1 arc3
実践的な設計パターン
パターン1: ワーカープール
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
struct WorkerPool {
workers: Vec<thread::JoinHandle<()>>,
sender: mpsc::Sender<Task>,
}
type Task = Box<dyn FnOnce() + Send + 'static>;
impl WorkerPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size)
.map(|id| {
let receiver = Arc::clone(&receiver);
thread::spawn(move || loop {
let task = receiver.lock().unwrap().recv();
match task {
Ok(task) => {
println!("Worker {} executing task", id);
task();
}
Err(_) => break,
}
})
})
.collect();
WorkerPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.sender.send(Box::new(f)).unwrap();
}
}
パターン2: Actor モデル
use std::sync::mpsc;
use std::thread;
struct Actor {
sender: mpsc::Sender<Message>,
}
enum Message {
Increment,
Decrement,
GetValue(mpsc::Sender<i32>),
}
impl Actor {
fn new() -> Self {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut value = 0;
for msg in rx {
match msg {
Message::Increment => value += 1,
Message::Decrement => value -= 1,
Message::GetValue(reply) => {
reply.send(value).unwrap();
}
}
}
});
Actor { sender: tx }
}
fn increment(&self) {
self.sender.send(Message::Increment).unwrap();
}
fn get_value(&self) -> i32 {
let (tx, rx) = mpsc::channel();
self.sender.send(Message::GetValue(tx)).unwrap();
rx.recv().unwrap()
}
}
セルフチェック質問
理解度を確認するための質問です:
基礎レベル
- Q:
Arc::clone(&arc)は何をしますか?
- Q:
Mutex::lock()の戻り値の型は?
Result, PoisonError> です。- Q: なぜ
Rcはスレッド間で共有できないのですか?
中級レベル
- Q:
Arcと>> Mutexの違いは?>>
- Q: チャネルでtxをdropしないとどうなりますか?
- Q:
SendとSyncの関係は?
T: Sync ⇔ &T: Send。型がSyncなら、その参照はSend。上級レベル
- Q: なぜ
Arcは> Cloneを実装していますか?
Arc が Clone を実装し、Mutex の Clone は不要(Arc経由で共有)。- Q: デッドロックを検出する方法は?
try_lock() を使うか、ロックの順序を統一。または parking_lot クレートのデッドロック検出機能。- Q:
MutexよりRwLockが遅くなる場合は?
RwLock は読み取り/書き込みの調整にオーバーヘッドがあります。次のステップ
Day 5の課題を完了したら:
- 復習: 3つのExerciseを見直し、パターンを理解
- 応用: 自分のプロジェクトで並行処理を使ってみる
- 準備: Day 6のキャップストーンで全概念を統合
参考資料
深掘り学習
- Rust Atomics and Locks - Mara Bos著(並行処理の詳細)
- The Rustonomicon - Unsafe Rustと低レベル並行処理
- Crossbeam documentation - 高度な並行処理ライブラリ
ビデオ
Day 6で、これまでの全てを統合したプロジェクトに挑戦しましょう!