Day 5: 並行処理入門 - 背景知識
Fearless Concurrency - 恐れのない並行処理
Rustの並行処理は「Fearless Concurrency」という哲学のもとに設計されています。これは「コンパイル時に並行処理のバグを防げるため、実行時の恐れがない」という意味です。
並行処理の歴史と課題
1. 並行処理が必要な理由
現代のコンピュータはマルチコアプロセッサが標準です:
- パフォーマンス: 複数のコアを活用して計算を高速化
- レスポンシブ性: UIをブロックせずにバックグラウンド処理
- スケーラビリティ: サーバーで数千の接続を同時処理
- リソース効率: I/O待機中に他の作業を実行
シングルスレッド:
Task A ████████████ (12秒)
Task B ████████████ (12秒)
合計: 24秒
マルチスレッド:
Task A ████████████
Task B ████████████
合計: 12秒
2. 従来の並行処理の問題点
多くのプログラミング言語では、並行処理は「難しく危険」とされてきました:
データ競合 (Data Race)
// C言語の例 - データ競合が発生
int counter = 0;
void* increment(void* arg) {
for (int i = 0; i < 100000; i++) {
counter++; // ⚠️ 複数スレッドから同時アクセス
}
return NULL;
}
// 2つのスレッドで実行すると、
// 期待値: 200000
// 実際: 147382 (実行ごとに異なる!)
デッドロック (Deadlock)
// Javaの例 - デッドロックが発生
Object lock1 = new Object();
Object lock2 = new Object();
Thread t1 = new Thread(() -> {
synchronized(lock1) {
Thread.sleep(10);
synchronized(lock2) { /* 作業 */ }
}
});
Thread t2 = new Thread(() -> {
synchronized(lock2) { // lock2を先に取得
Thread.sleep(10);
synchronized(lock1) { /* 作業 */ } // ⚠️ デッドロック
}
});
競合状態 (Race Condition)
# Pythonの例 - 競合状態
balance = 1000
def withdraw(amount):
global balance
if balance >= amount:
# ⚠️ ここで他のスレッドが実行される可能性
balance -= amount
return True
return False
# 2つのスレッドで同時に900円引き出すと、
# 残高がマイナスになる可能性がある
3. Rustの解決策
Rustは型システムと所有権システムで、これらの問題をコンパイル時に検出します:
// ❌ コンパイルエラー!
let mut data = vec![1, 2, 3];
std::thread::spawn(|| {
data.push(4); // エラー: dataの所有権がない
});
println!("{:?}", data);
error[E0373]: closure may outlive the current function
--> src/main.rs:4:23
|
4 | std::thread::spawn(|| {
| ^^ may outlive borrowed value `data`
5 | data.push(4);
| ---- `data` is borrowed here
このエラーメッセージは「スレッドがdataより長生きする可能性がある」と教えてくれます。
Rustの並行処理の仕組み
1. Send と Sync トレイト
Rustは2つの特別なマーカートレイトで並行処理の安全性を保証します:
Send トレイト
定義: スレッド間で所有権を移動できる型
// Send を実装している型
let data = vec![1, 2, 3]; // Vec<i32> は Send
std::thread::spawn(move || {
println!("{:?}", data); // ✅ OK: 所有権を移動
});
// Send を実装していない型
let rc = Rc::new(5); // Rc<i32> は Send ではない
std::thread::spawn(move || {
println!("{}", rc); // ❌ コンパイルエラー
});
なぜRcはSendではない?
メインスレッド: 子スレッド:
Rc { count: 2 } -----> Rc { count: 2 }
↓ ↓
count-- count--
↓ ↓
count = ? ⚠️ データ競合!
Rcの参照カウントはスレッドセーフではないため、複数スレッドで共有できません。
Sync トレイト
定義: 複数のスレッドから安全に参照できる型
// T が Sync ⇔ &T が Send
// i32 は Sync
let x = 42;
std::thread::spawn(|| {
println!("{}", x); // ✅ OK: i32 はコピー可能
});
// Cell<i32> は Sync ではない
use std::cell::Cell;
let cell = Cell::new(42);
std::thread::spawn(|| {
cell.set(10); // ❌ コンパイルエラー
});
主要な型のSend/Sync実装
| 型 | Send | Sync | 理由 |
|---|---|---|---|
| `i32`, `String` | ✅ | ✅ | イミュータブルで安全 |
| `Vec |
✅ (Tが Send) | ✅ (Tが Sync) | 所有権ベース |
| `Rc |
❌ | ❌ | 参照カウントが非アトミック |
| `Arc |
✅ (Tが Send+Sync) | ✅ (Tが Sync) | アトミックな参照カウント |
| `Cell |
✅ (Tが Send) | ❌ | 内部可変性が非同期 |
| `Mutex |
✅ (Tが Send) | ✅ (Tが Send) | ロックで保護 |
2. 所有権による安全性
Rustの所有権システムは、並行処理でも機能します:
let data = vec![1, 2, 3];
// moveキーワードで所有権を移動
let handle = std::thread::spawn(move || {
println!("{:?}", data); // dataの所有権はこのスレッドに
});
// println!("{:?}", data); // ❌ エラー: dataはすでに移動済み
handle.join().unwrap();
所有権の移動とライフタイム
メインスレッド: 子スレッド:
┌──────────┐
│ data │ ──move──> ┌──────────┐
│ (所有者) │ │ data │
└──────────┘ │ (新所有者)│
× └──────────┘
使用不可 ✓ 使用可
Rustの並行処理パターン
1. スレッド (Threads)
基本的なスレッド生成
use std::thread;
use std::time::Duration;
fn main() {
// スレッドを生成
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子スレッド: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// メインスレッドも並行して実行
for i in 1..5 {
println!("メインスレッド: {}", i);
thread::sleep(Duration::from_millis(1));
}
// 子スレッドの完了を待つ
handle.join().unwrap();
}
出力例:
メインスレッド: 1
子スレッド: 1
メインスレッド: 2
子スレッド: 2
子スレッド: 3
メインスレッド: 3
...
JoinHandle の重要性
// ❌ 悪い例: スレッドを待たない
thread::spawn(|| {
println!("この出力は表示されないかも");
});
// メインスレッドが先に終了すると、子スレッドも強制終了
// ✅ 良い例: joinで完了を待つ
let handle = thread::spawn(|| {
println!("この出力は必ず表示される");
});
handle.join().unwrap();
スレッドへのデータ渡し
let numbers = vec![1, 2, 3];
let handle = thread::spawn(move || {
let sum: i32 = numbers.iter().sum();
println!("合計: {}", sum);
});
handle.join().unwrap();
2. Arc (Atomic Reference Counted)
問題: 複数のスレッドで同じデータを読みたい
// ❌ これはコンパイルエラー
let data = vec![1, 2, 3];
let handle1 = thread::spawn(move || println!("{:?}", data));
let handle2 = thread::spawn(move || println!("{:?}", data)); // エラー: dataは既に移動済み
解決策: Arcを使う
use std::sync::Arc;
let data = Arc::new(vec![1, 2, 3]);
let data1 = Arc::clone(&data); // 参照カウントを増やす
let handle1 = thread::spawn(move || {
println!("スレッド1: {:?}", data1);
});
let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
println!("スレッド2: {:?}", data2);
});
println!("メイン: {:?}", data);
handle1.join().unwrap();
handle2.join().unwrap();
Arcの仕組み
初期状態:
Arc { data: [1,2,3], count: 1 }
Arc::clone後:
Arc { data: [1,2,3], count: 3 }
↓ ↓ ↓
data data1 data2
drop後:
count: 3 → 2 → 1 → 0 (メモリ解放)
RcとArcの違い
use std::rc::Rc;
use std::sync::Arc;
// Rc: シングルスレッド用(高速)
let rc = Rc::new(5);
let rc_clone = Rc::clone(&rc);
// 参照カウントの増減は通常の加算/減算
// Arc: マルチスレッド用(やや遅い)
let arc = Arc::new(5);
let arc_clone = Arc::clone(&arc);
// 参照カウントの増減はアトミック操作(CPU命令)
3. Mutex (Mutual Exclusion)
問題: 複数のスレッドで同じデータを書き換えたい
// ❌ Arcだけでは書き換えられない
let data = Arc::new(vec![1, 2, 3]);
let data1 = Arc::clone(&data);
thread::spawn(move || {
data1.push(4); // エラー: Arc<T>はイミュータブル
});
解決策: Arcパターン
use std::sync::{Arc, Mutex};
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data1 = Arc::clone(&data);
let handle1 = thread::spawn(move || {
let mut vec = data1.lock().unwrap(); // ロックを取得
vec.push(4);
// ロックは自動的に解放(Drop)
});
let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
let mut vec = data2.lock().unwrap();
vec.push(5);
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("{:?}", data.lock().unwrap()); // [1, 2, 3, 4, 5]
Mutexのロックメカニズム
スレッド1 Mutex スレッド2
│ │ │
├─ lock() ──────> ロック取得 │
│ (所有権獲得) │
│ │ │
│ │ <── lock() (待機)
├─ vec.push(4) │ │
│ │ │
├─ drop(guard) ──> ロック解放 │
│ │ │
│ │ <── ロック取得
│ │ │
│ │ vec.push(5)
Mutexの注意点
// ⚠️ デッドロックの例
let data = Arc::new(Mutex::new(0));
let guard1 = data.lock().unwrap(); // ロックを取得
let guard2 = data.lock().unwrap(); // ⚠️ 同じスレッドで再度ロック取得 → デッドロック
// ✅ スコープでロックを制限
{
let mut guard = data.lock().unwrap();
*guard += 1;
} // ここでロックが解放される
4. チャネル (Channels)
哲学: 「メモリを共有するのではなく、メッセージを共有せよ」
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let message = String::from("Hello from thread!");
tx.send(message).unwrap();
// messageはsendで移動されたので、以降使用不可
});
let received = rx.recv().unwrap();
println!("受信: {}", received);
}
チャネルの種類
// 1. 非同期チャネル(バッファなし)
let (tx, rx) = mpsc::channel();
tx.send(1).unwrap(); // すぐに成功
// 2. 同期チャネル(バッファサイズ指定)
let (tx, rx) = mpsc::sync_channel(0); // バッファサイズ0
thread::spawn(move || {
tx.send(1).unwrap(); // 受信されるまでブロック
});
thread::sleep(Duration::from_secs(1));
println!("{}", rx.recv().unwrap());
複数の送信者
let (tx, rx) = mpsc::channel();
for i in 0..5 {
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(i).unwrap();
});
}
drop(tx); // 元のtxをdrop(重要!)
for received in rx {
println!("受信: {}", received);
}
チャネル vs 共有メモリ
| パターン | メリット | デメリット | 適用例 |
|---|---|---|---|
| チャネル | ・データ競合なし ・疎結合 |
・コピーのオーバーヘッド | ・プロデューサー/コンシューマー ・パイプライン処理 |
| Arc+Mutex | ・メモリ効率 ・直接アクセス |
・デッドロックのリスク ・密結合 |
・共有状態 ・頻繁な読み書き |
実世界での活用事例
1. Discord - Rustで5億5000万ユーザーの負荷を処理
背景: DiscordはGoで実装されていたメッセージルーティングサービスをRustに書き換えました。
課題: ガベージコレクション(GC)による2分ごとのレイテンシスパイク
Go実装:
レイテンシ: ────────┃────────┃────────┃
(通常) GC GC GC
(遅延) (遅延) (遅延)
Rust実装:
レイテンシ: ─────────────────────────────
(常に低レイテンシ)
成果:
- レイテンシの安定性向上
- メモリ使用量が10%削減
- CPU使用率が安定
使用技術: tokio(非同期ランタイム)、Arc、チャネル
参考: Why Discord is switching from Go to Rust
2. Cloudflare - 1日4兆リクエストをRustで処理
背景: CDNプロキシサーバーをCからRustに置き換え
Rustを選んだ理由:
- メモリ安全性(Cのバッファオーバーフロー脆弱性を排除)
- 並行処理の安全性
- パフォーマンス(Cと同等)
アーキテクチャ:
リクエスト → [Rustプロキシ] ──┬── オリジンサーバー1
4兆/日 ↓ ├── オリジンサーバー2
Arc<Config> └── オリジンサーバーN
↓
複数のワーカースレッド
参考: Cloudflare's Rust-based Pingora
3. AWS Lambda - Firecracker VMM
Firecracker: AWSのサーバーレス(Lambda, Fargate)を支えるマイクロVM
Rustが必要な理由:
- セキュリティ: 数千のVMを1台のホストで実行
- パフォーマンス: 125ms以下の起動時間
- 並行処理: 複数のVMを安全に管理
並行処理の実装:
// 各VMは独立したスレッドで実行
let vm1 = Arc::new(Mutex::new(VirtualMachine::new()));
let vm2 = Arc::new(Mutex::new(VirtualMachine::new()));
thread::spawn(move || vm1.lock().unwrap().run());
thread::spawn(move || vm2.lock().unwrap().run());
4. Figma - デスクトップアプリのマルチスレッド処理
背景: Figmaのデスクトップアプリは大きなデザインファイルを処理
Rustの活用:
// ファイル読み込みを並行処理
let files = vec!["layer1.fig", "layer2.fig", "layer3.fig"];
let handles: Vec<_> = files.into_iter()
.map(|file| {
thread::spawn(move || {
load_layer(file)
})
})
.collect();
let layers: Vec<_> = handles.into_iter()
.map(|h| h.join().unwrap())
.collect();
成果: 10倍のパフォーマンス向上
参考: Rust in Production at Figma
5. Dropbox - 並行ファイル同期エンジン
Nucleus: Dropboxの新しい同期エンジン
並行処理の設計:
// ファイル変更の監視とアップロードを並行実行
let (file_tx, file_rx) = mpsc::channel();
// 監視スレッド
thread::spawn(move || {
for event in file_watcher {
file_tx.send(event).unwrap();
}
});
// アップロードワーカー(複数)
for _ in 0..4 {
let rx = file_rx.clone();
thread::spawn(move || {
for file in rx {
upload_file(file);
}
});
}
参考: Rewriting Dropbox sync engine in Rust
非同期プログラミング(async/await)
スレッド vs 非同期
// スレッドベース: OSスレッドを使用(重い)
for i in 0..1000 {
thread::spawn(move || {
// 各スレッドは約2MBのスタックメモリ
});
}
// 合計: 2GB以上のメモリ使用
// 非同期ベース: タスク(軽量)
for i in 0..1000 {
tokio::spawn(async move {
// タスクは数KBのメモリ
});
}
// 合計: 数MB程度のメモリ使用
Tokioランタイム
use tokio;
#[tokio::main]
async fn main() {
// 非同期関数を並行実行
let task1 = tokio::spawn(async {
// I/O処理など
tokio::time::sleep(Duration::from_secs(1)).await;
"task1 完了"
});
let task2 = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(1)).await;
"task2 完了"
});
let (result1, result2) = tokio::join!(task1, task2);
println!("{:?}, {:?}", result1, result2);
}
非同期が適している場面:
- I/O待ちが多い(ネットワーク、ファイル)
- 大量の並行接続(Webサーバー)
- メモリ効率が重要
スレッドが適している場面:
- CPU集約的な処理
- ブロッキングAPIの使用
- シンプルな並行処理
メンタルモデル:並行処理の考え方
1. 所有権ベースの並行処理
従来の言語:
スレッド1 ──→ データ ←── スレッド2
(競合の可能性)
Rust:
スレッド1 ──→ データ(所有)
スレッド2 ──→ 別のデータ(所有)
または
スレッド1 ──→ Arc ←── スレッド2
(参照カウント)
2. チャネルとメッセージパッシング
プロデューサー コンシューマー
│ │
├── メッセージ1 ──→ 受信
├── メッセージ2 ──→ 受信
└── メッセージ3 ──→ 受信
3. Arc + Mutex の階層
Arc: 共有所有権
└─ Mutex: 排他制御
└─ データ: 実際の値
よくある質問
Q1: ArcとMutexは常にセットで使う?
A: いいえ。読み取り専用ならArcだけで十分です:
// 読み取り専用 - Arcのみ
let config = Arc::new(Config { port: 8080 });
// 書き込みあり - Arc + Mutex
let counter = Arc::new(Mutex::new(0));
Q2: RwLock とは?
A: 読み取りロックと書き込みロックを分離:
use std::sync::RwLock;
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// 複数の読み取りは並行OK
let reader1 = data.read().unwrap();
let reader2 = data.read().unwrap();
// 書き込みは排他的
let mut writer = data.write().unwrap();
writer.push(4);
使い分け:
- 読み取りが多い →
RwLock - 書き込みが多い →
Mutex(シンプル)
Q3: スレッドプールは?
A: rayonクレートが便利です:
use rayon::prelude::*;
let sum: i32 = (0..1000)
.into_par_iter() // 並列イテレータ
.map(|x| x * x)
.sum();
学習チェックリスト
Day 5の課題に進む前に、以下を理解できているか確認しましょう:
参考資料
公式ドキュメント
- The Rust Book - Chapter 16: Fearless Concurrency
- Rust by Example - Threads
- std::sync module
- std::thread module
クレート
ブログ記事
- Discord: Why we switched from Go to Rust
- Cloudflare: Building Pingora
- AWS: Firecracker Design
- assignment.md: 課題に挑戦
- solution.md: 解答例を参照(詰まったら)
- explanation.md: 詳しい解説を読む
- evaluation.md: 自己評価
次のステップ
背景知識を理解したら、実際にコードを書いて学びましょう:
Fearless Concurrency の世界へようこそ!