Day 5: 並行処理入門 - 解答例
Exercise 1: スレッド生成と同期
基本実装
use std::thread;
use std::time::Duration;
fn main() {
// 10個のスレッドを生成し、JoinHandleを保存
let handles: Vec<_> = (0..10)
.map(|i| {
// moveクロージャで i の所有権をスレッドに移動
thread::spawn(move || {
println!("Thread {} started", i);
// 実際の作業をシミュレート
thread::sleep(Duration::from_millis(10));
println!("Thread {} finished", i);
i * 2 // 戻り値
})
})
.collect();
// 全てのスレッドの完了を待つ
for handle in handles {
// join()は Result<T> を返す
match handle.join() {
Ok(result) => println!("Thread returned: {}", result),
Err(e) => eprintln!("Thread panicked: {:?}", e),
}
}
println!("All threads completed!");
}
出力例:
Thread 0 started
Thread 1 started
Thread 2 started
...
Thread 0 finished
Thread returned: 0
Thread 1 finished
Thread returned: 2
...
All threads completed!
改良版: スレッドに名前を付ける
use std::thread;
fn main() {
let handles: Vec<_> = (0..10)
.map(|i| {
// スレッドビルダーで名前を設定
thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || {
println!(
"Thread {:?} (index {}) is running",
thread::current().name(),
i
);
i * 2
})
.unwrap() // Result<JoinHandle> をunwrap
})
.collect();
for (i, handle) in handles.into_iter().enumerate() {
let result = handle.join().unwrap();
println!("Worker {} returned: {}", i, result);
}
}
最適化版: スレッドプールの考え方
use std::sync::mpsc;
use std::thread;
fn main() {
let (result_tx, result_rx) = mpsc::channel();
// ワーカースレッド(4つのみ)
let handles: Vec<_> = (0..4)
.map(|worker_id| {
let tx = result_tx.clone();
thread::spawn(move || {
// 各ワーカーが複数のタスクを処理
for task_id in 0..3 {
let result = worker_id * 10 + task_id;
tx.send((worker_id, task_id, result)).unwrap();
thread::sleep(std::time::Duration::from_millis(10));
}
})
})
.collect();
// 送信側のオリジナルをdrop(重要!)
drop(result_tx);
// 全ての結果を受信
for (worker, task, result) in result_rx {
println!("Worker {} completed task {}: result = {}", worker, task, result);
}
// 全スレッドの終了を待つ
for handle in handles {
handle.join().unwrap();
}
}
---
Exercise 2: Arc + Mutex による共有状態
基本実装
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc<Mutex<T>>で共有可能なミュータブルな状態を作成
let counter = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
// Arcをクローン(参照カウントを増やす)
let counter = Arc::clone(&counter);
thread::spawn(move || {
// Mutexのロックを取得
let mut num = counter.lock().unwrap();
// 共有状態を更新
*num += 1;
// numがスコープを抜けるとロックが自動解放
})
})
.collect();
// 全スレッドの完了を待つ
for handle in handles {
handle.join().unwrap();
}
// 最終結果を表示
println!("Result: {}", *counter.lock().unwrap());
// 期待値: 10
}
改良版: より現実的な例(銀行口座)
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
#[derive(Debug)]
struct BankAccount {
balance: i32,
}
impl BankAccount {
fn new(initial_balance: i32) -> Self {
BankAccount { balance: initial_balance }
}
fn deposit(&mut self, amount: i32) {
println!("Depositing {} yen", amount);
self.balance += amount;
}
fn withdraw(&mut self, amount: i32) -> bool {
if self.balance >= amount {
println!("Withdrawing {} yen", amount);
self.balance -= amount;
true
} else {
println!("Insufficient funds for {} yen", amount);
false
}
}
}
fn main() {
// 共有銀行口座
let account = Arc::new(Mutex::new(BankAccount::new(10000)));
let mut handles = vec![];
// 入金スレッド(3つ)
for i in 0..3 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(i * 10));
let mut acc = account.lock().unwrap();
acc.deposit(1000);
});
handles.push(handle);
}
// 出金スレッド(5つ)
for i in 0..5 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(i * 15));
let mut acc = account.lock().unwrap();
acc.withdraw(2000);
});
handles.push(handle);
}
// 全スレッド完了を待つ
for handle in handles {
handle.join().unwrap();
}
// 最終残高
let final_balance = account.lock().unwrap().balance;
println!("\n最終残高: {} yen", final_balance);
// 初期: 10000, 入金: +3000, 出金: -10000 (一部失敗)
}
最適化版: RwLockを使った読み取り最適化
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
// 読み取りが多い場合はRwLockが効率的
let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// 読み取りスレッド(多数)
for i in 0..10 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
// read()は複数同時にロック取得可能
let vec = data.read().unwrap();
let sum: i32 = vec.iter().sum();
println!("Reader {}: sum = {}", i, sum);
});
handles.push(handle);
}
// 書き込みスレッド(少数)
for i in 0..2 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(10));
// write()は排他的
let mut vec = data.write().unwrap();
vec.push(i + 10);
println!("Writer {} added value", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final data: {:?}", data.read().unwrap());
}
---
Exercise 3: チャネルによるメッセージパッシング
基本実装
use std::sync::mpsc;
use std::thread;
fn main() {
// チャネルを作成(送信側tx、受信側rx)
let (tx, rx) = mpsc::channel();
// 送信スレッド
thread::spawn(move || {
let message = String::from("Hello from thread!");
// 所有権ごと送信(messageはmoveされる)
tx.send(message).unwrap();
// println!("{}", message); // ❌ エラー: messageは既に移動済み
});
// メインスレッドで受信
let received = rx.recv().unwrap(); // ブロッキング受信
println!("Received: {}", received);
}
改良版: 複数メッセージの送受信
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
"Hello",
"from",
"the",
"thread",
];
for msg in messages {
tx.send(msg).unwrap();
// メッセージ間に遅延
thread::sleep(Duration::from_millis(100));
}
});
// イテレータとして受信
for received in rx {
println!("Got: {}", received);
}
// 送信側がdropされるとイテレータが終了
}
最適化版: 複数プロデューサー(MPSC)
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
// 5つのプロデューサースレッド
for id in 0..5 {
// 送信側をクローン
let thread_tx = tx.clone();
let handle = thread::spawn(move || {
for i in 0..3 {
let message = format!("Producer {} - Message {}", id, i);
thread_tx.send(message).unwrap();
thread::sleep(std::time::Duration::from_millis(10));
}
});
handles.push(handle);
}
// 元のtxをdrop(重要!これがないとrxが永遠に待つ)
drop(tx);
// 全メッセージを受信
for received in rx {
println!("{}", received);
}
// スレッドの完了を待つ
for handle in handles {
handle.join().unwrap();
}
}
応用例: パイプライン処理
use std::sync::mpsc;
use std::thread;
fn main() {
// ステージ1: データ生成
let (tx1, rx1) = mpsc::channel();
thread::spawn(move || {
for i in 1..=10 {
tx1.send(i).unwrap();
}
});
// ステージ2: データ変換(2倍)
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for received in rx1 {
let doubled = received * 2;
println!("Stage 2: {} -> {}", received, doubled);
tx2.send(doubled).unwrap();
}
});
// ステージ3: データ集計
let (tx3, rx3) = mpsc::channel();
thread::spawn(move || {
let mut sum = 0;
for received in rx2 {
sum += received;
println!("Stage 3: current sum = {}", sum);
tx3.send(sum).unwrap();
}
});
// 最終結果
let mut final_sum = 0;
for received in rx3 {
final_sum = received;
}
println!("\nFinal result: {}", final_sum);
// 期待値: (1+2+...+10) * 2 = 110
}
---
よくある間違いと対策
間違い 1: Arcなしで共有しようとする
// ❌ コンパイルエラー
let data = vec![1, 2, 3];
let handle1 = thread::spawn(move || println!("{:?}", data));
let handle2 = thread::spawn(move || println!("{:?}", data)); // エラー: dataは既に移動
// ✅ 正しい方法
let data = Arc::new(vec![1, 2, 3]);
let data1 = Arc::clone(&data);
let data2 = Arc::clone(&data);
let handle1 = thread::spawn(move || println!("{:?}", data1));
let handle2 = thread::spawn(move || println!("{:?}", data2));
間違い 2: ロックのスコープが長すぎる
// ❌ 悪い例: ロックを長時間保持
let data = Arc::new(Mutex::new(0));
let guard = data.lock().unwrap();
// 長い処理...
thread::sleep(Duration::from_secs(10)); // ⚠️ 他のスレッドがブロックされる
drop(guard);
// ✅ 良い例: スコープでロックを制限
{
let mut guard = data.lock().unwrap();
*guard += 1;
} // ここでロック解放
// 長い処理...
thread::sleep(Duration::from_secs(10));
間違い 3: joinを忘れる
// ❌ 悪い例: スレッドを待たない
for i in 0..10 {
thread::spawn(move || {
println!("Thread {}", i);
});
}
// メインスレッドが先に終了すると、子スレッドも強制終了
// ✅ 良い例: joinで完了を待つ
let handles: Vec<_> = (0..10)
.map(|i| thread::spawn(move || println!("Thread {}", i)))
.collect();
for handle in handles {
handle.join().unwrap();
}
間違い 4: チャネルの送信側をdropし忘れる
// ❌ 悪い例
let (tx, rx) = mpsc::channel();
for i in 0..5 {
let tx_clone = tx.clone();
thread::spawn(move || tx_clone.send(i).unwrap());
}
// txがまだ生きている
for received in rx { // ⚠️ 永遠に待ち続ける
println!("{}", received);
}
// ✅ 良い例
let (tx, rx) = mpsc::channel();
for i in 0..5 {
let tx_clone = tx.clone();
thread::spawn(move || tx_clone.send(i).unwrap());
}
drop(tx); // 元のtxをdrop
for received in rx { // 正しく終了する
println!("{}", received);
}
---
パフォーマンス比較とベンチマーク
シングルスレッド vs マルチスレッド
use std::thread;
use std::time::Instant;
fn compute_heavy(n: u64) -> u64 {
(0..n).map(|x| x * x).sum()
}
fn main() {
let n = 1_000_000;
// シングルスレッド
let start = Instant::now();
let result1 = compute_heavy(n);
let result2 = compute_heavy(n);
let result3 = compute_heavy(n);
let result4 = compute_heavy(n);
let single_duration = start.elapsed();
println!("Single-threaded: {:?}", single_duration);
// マルチスレッド
let start = Instant::now();
let handles: Vec<_> = (0..4)
.map(|_| thread::spawn(move || compute_heavy(n)))
.collect();
let results: Vec<_> = handles.into_iter()
.map(|h| h.join().unwrap())
.collect();
let multi_duration = start.elapsed();
println!("Multi-threaded: {:?}", multi_duration);
println!("Speedup: {:.2}x", single_duration.as_secs_f64() / multi_duration.as_secs_f64());
}
典型的な結果(4コアCPU):
Single-threaded: 842ms
Multi-threaded: 235ms
Speedup: 3.58x
Mutex vs RwLock
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Instant;
fn benchmark_mutex() -> std::time::Duration {
let data = Arc::new(Mutex::new(0));
let start = Instant::now();
let handles: Vec<_> = (0..100)
.map(|_| {
let data = Arc::clone(&data);
thread::spawn(move || {
for _ in 0..1000 {
let _guard = data.lock().unwrap();
// 読み取りのみ
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
start.elapsed()
}
fn benchmark_rwlock() -> std::time::Duration {
let data = Arc::new(RwLock::new(0));
let start = Instant::now();
let handles: Vec<_> = (0..100)
.map(|_| {
let data = Arc::clone(&data);
thread::spawn(move || {
for _ in 0..1000 {
let _guard = data.read().unwrap();
// 読み取りのみ
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
start.elapsed()
}
fn main() {
println!("Mutex: {:?}", benchmark_mutex());
println!("RwLock: {:?}", benchmark_rwlock());
}
典型的な結果:
Mutex: 245ms
RwLock: 89ms
読み取りが多い場合、RwLockは約2.7倍高速!
---
完全な実装例: 並行Webスクレイパー
use std::sync::{Arc, Mutex};
use std::thread;
fn fetch_url(url: &str) -> String {
// 実際のHTTPリクエストの代わりにシミュレート
thread::sleep(std::time::Duration::from_millis(100));
format!("Content from {}", url)
}
fn main() {
let urls = vec![
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
"https://example.com/4",
"https://example.com/5",
];
// 結果を格納する共有ベクター
let results = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = urls
.into_iter()
.map(|url| {
let results = Arc::clone(&results);
thread::spawn(move || {
println!("Fetching: {}", url);
let content = fetch_url(url);
// ロックを取得して結果を追加
let mut results = results.lock().unwrap();
results.push((url.to_string(), content));
})
})
.collect();
// 全スレッド完了を待つ
for handle in handles {
handle.join().unwrap();
}
// 結果を表示
let results = results.lock().unwrap();
for (url, content) in results.iter() {
println!("{}: {}", url, content);
}
}
---
まとめ
3つのExerciseを通じて、Rustの並行処理の基本を学びました:
- スレッド生成:
thread::spawnとjoin - 共有状態:
Arcパターン> - メッセージパッシング:
mpsc::channel
これらは実際のプロダクションコードで頻繁に使われるパターンです!