Day 5: 並行処理入門 - 解答例

Exercise 1: スレッド生成と同期

基本実装

use std::thread;
use std::time::Duration;

fn main() {
    // 10個のスレッドを生成し、JoinHandleを保存
    let handles: Vec<_> = (0..10)
        .map(|i| {
            // moveクロージャで i の所有権をスレッドに移動
            thread::spawn(move || {
                println!("Thread {} started", i);
                // 実際の作業をシミュレート
                thread::sleep(Duration::from_millis(10));
                println!("Thread {} finished", i);
                i * 2  // 戻り値
            })
        })
        .collect();

    // 全てのスレッドの完了を待つ
    for handle in handles {
        // join()は Result<T> を返す
        match handle.join() {
            Ok(result) => println!("Thread returned: {}", result),
            Err(e) => eprintln!("Thread panicked: {:?}", e),
        }
    }

    println!("All threads completed!");
}

出力例:

Thread 0 started
Thread 1 started
Thread 2 started
...
Thread 0 finished
Thread returned: 0
Thread 1 finished
Thread returned: 2
...
All threads completed!

改良版: スレッドに名前を付ける

use std::thread;

fn main() {
    let handles: Vec<_> = (0..10)
        .map(|i| {
            // スレッドビルダーで名前を設定
            thread::Builder::new()
                .name(format!("worker-{}", i))
                .spawn(move || {
                    println!(
                        "Thread {:?} (index {}) is running",
                        thread::current().name(),
                        i
                    );
                    i * 2
                })
                .unwrap()  // Result<JoinHandle> をunwrap
        })
        .collect();

    for (i, handle) in handles.into_iter().enumerate() {
        let result = handle.join().unwrap();
        println!("Worker {} returned: {}", i, result);
    }
}

最適化版: スレッドプールの考え方

use std::sync::mpsc;
use std::thread;

fn main() {
    let (result_tx, result_rx) = mpsc::channel();

    // ワーカースレッド(4つのみ)
    let handles: Vec<_> = (0..4)
        .map(|worker_id| {
            let tx = result_tx.clone();
            thread::spawn(move || {
                // 各ワーカーが複数のタスクを処理
                for task_id in 0..3 {
                    let result = worker_id * 10 + task_id;
                    tx.send((worker_id, task_id, result)).unwrap();
                    thread::sleep(std::time::Duration::from_millis(10));
                }
            })
        })
        .collect();

    // 送信側のオリジナルをdrop(重要!)
    drop(result_tx);

    // 全ての結果を受信
    for (worker, task, result) in result_rx {
        println!("Worker {} completed task {}: result = {}", worker, task, result);
    }

    // 全スレッドの終了を待つ
    for handle in handles {
        handle.join().unwrap();
    }
}

---

Exercise 2: Arc + Mutex による共有状態

基本実装

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc<Mutex<T>>で共有可能なミュータブルな状態を作成
    let counter = Arc::new(Mutex::new(0));

    let handles: Vec<_> = (0..10)
        .map(|_| {
            // Arcをクローン(参照カウントを増やす)
            let counter = Arc::clone(&counter);

            thread::spawn(move || {
                // Mutexのロックを取得
                let mut num = counter.lock().unwrap();

                // 共有状態を更新
                *num += 1;

                // numがスコープを抜けるとロックが自動解放
            })
        })
        .collect();

    // 全スレッドの完了を待つ
    for handle in handles {
        handle.join().unwrap();
    }

    // 最終結果を表示
    println!("Result: {}", *counter.lock().unwrap());
    // 期待値: 10
}

改良版: より現実的な例(銀行口座)

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

#[derive(Debug)]
struct BankAccount {
    balance: i32,
}

impl BankAccount {
    fn new(initial_balance: i32) -> Self {
        BankAccount { balance: initial_balance }
    }

    fn deposit(&mut self, amount: i32) {
        println!("Depositing {} yen", amount);
        self.balance += amount;
    }

    fn withdraw(&mut self, amount: i32) -> bool {
        if self.balance >= amount {
            println!("Withdrawing {} yen", amount);
            self.balance -= amount;
            true
        } else {
            println!("Insufficient funds for {} yen", amount);
            false
        }
    }
}

fn main() {
    // 共有銀行口座
    let account = Arc::new(Mutex::new(BankAccount::new(10000)));

    let mut handles = vec![];

    // 入金スレッド(3つ)
    for i in 0..3 {
        let account = Arc::clone(&account);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(i * 10));
            let mut acc = account.lock().unwrap();
            acc.deposit(1000);
        });
        handles.push(handle);
    }

    // 出金スレッド(5つ)
    for i in 0..5 {
        let account = Arc::clone(&account);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(i * 15));
            let mut acc = account.lock().unwrap();
            acc.withdraw(2000);
        });
        handles.push(handle);
    }

    // 全スレッド完了を待つ
    for handle in handles {
        handle.join().unwrap();
    }

    // 最終残高
    let final_balance = account.lock().unwrap().balance;
    println!("\n最終残高: {} yen", final_balance);
    // 初期: 10000, 入金: +3000, 出金: -10000 (一部失敗)
}

最適化版: RwLockを使った読み取り最適化

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
    // 読み取りが多い場合はRwLockが効率的
    let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));

    let mut handles = vec![];

    // 読み取りスレッド(多数)
    for i in 0..10 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            // read()は複数同時にロック取得可能
            let vec = data.read().unwrap();
            let sum: i32 = vec.iter().sum();
            println!("Reader {}: sum = {}", i, sum);
        });
        handles.push(handle);
    }

    // 書き込みスレッド(少数)
    for i in 0..2 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(10));
            // write()は排他的
            let mut vec = data.write().unwrap();
            vec.push(i + 10);
            println!("Writer {} added value", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final data: {:?}", data.read().unwrap());
}

---

Exercise 3: チャネルによるメッセージパッシング

基本実装

use std::sync::mpsc;
use std::thread;

fn main() {
    // チャネルを作成(送信側tx、受信側rx)
    let (tx, rx) = mpsc::channel();

    // 送信スレッド
    thread::spawn(move || {
        let message = String::from("Hello from thread!");

        // 所有権ごと送信(messageはmoveされる)
        tx.send(message).unwrap();

        // println!("{}", message);  // ❌ エラー: messageは既に移動済み
    });

    // メインスレッドで受信
    let received = rx.recv().unwrap();  // ブロッキング受信
    println!("Received: {}", received);
}

改良版: 複数メッセージの送受信

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec![
            "Hello",
            "from",
            "the",
            "thread",
        ];

        for msg in messages {
            tx.send(msg).unwrap();
            // メッセージ間に遅延
            thread::sleep(Duration::from_millis(100));
        }
    });

    // イテレータとして受信
    for received in rx {
        println!("Got: {}", received);
    }
    // 送信側がdropされるとイテレータが終了
}

最適化版: 複数プロデューサー(MPSC)

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    let mut handles = vec![];

    // 5つのプロデューサースレッド
    for id in 0..5 {
        // 送信側をクローン
        let thread_tx = tx.clone();

        let handle = thread::spawn(move || {
            for i in 0..3 {
                let message = format!("Producer {} - Message {}", id, i);
                thread_tx.send(message).unwrap();
                thread::sleep(std::time::Duration::from_millis(10));
            }
        });

        handles.push(handle);
    }

    // 元のtxをdrop(重要!これがないとrxが永遠に待つ)
    drop(tx);

    // 全メッセージを受信
    for received in rx {
        println!("{}", received);
    }

    // スレッドの完了を待つ
    for handle in handles {
        handle.join().unwrap();
    }
}

応用例: パイプライン処理

use std::sync::mpsc;
use std::thread;

fn main() {
    // ステージ1: データ生成
    let (tx1, rx1) = mpsc::channel();
    thread::spawn(move || {
        for i in 1..=10 {
            tx1.send(i).unwrap();
        }
    });

    // ステージ2: データ変換(2倍)
    let (tx2, rx2) = mpsc::channel();
    thread::spawn(move || {
        for received in rx1 {
            let doubled = received * 2;
            println!("Stage 2: {} -> {}", received, doubled);
            tx2.send(doubled).unwrap();
        }
    });

    // ステージ3: データ集計
    let (tx3, rx3) = mpsc::channel();
    thread::spawn(move || {
        let mut sum = 0;
        for received in rx2 {
            sum += received;
            println!("Stage 3: current sum = {}", sum);
            tx3.send(sum).unwrap();
        }
    });

    // 最終結果
    let mut final_sum = 0;
    for received in rx3 {
        final_sum = received;
    }
    println!("\nFinal result: {}", final_sum);
    // 期待値: (1+2+...+10) * 2 = 110
}

---

よくある間違いと対策

間違い 1: Arcなしで共有しようとする

// ❌ コンパイルエラー
let data = vec![1, 2, 3];

let handle1 = thread::spawn(move || println!("{:?}", data));
let handle2 = thread::spawn(move || println!("{:?}", data)); // エラー: dataは既に移動

// ✅ 正しい方法
let data = Arc::new(vec![1, 2, 3]);
let data1 = Arc::clone(&data);
let data2 = Arc::clone(&data);

let handle1 = thread::spawn(move || println!("{:?}", data1));
let handle2 = thread::spawn(move || println!("{:?}", data2));

間違い 2: ロックのスコープが長すぎる

// ❌ 悪い例: ロックを長時間保持
let data = Arc::new(Mutex::new(0));
let guard = data.lock().unwrap();
// 長い処理...
thread::sleep(Duration::from_secs(10));  // ⚠️ 他のスレッドがブロックされる
drop(guard);

// ✅ 良い例: スコープでロックを制限
{
    let mut guard = data.lock().unwrap();
    *guard += 1;
}  // ここでロック解放
// 長い処理...
thread::sleep(Duration::from_secs(10));

間違い 3: joinを忘れる

// ❌ 悪い例: スレッドを待たない
for i in 0..10 {
    thread::spawn(move || {
        println!("Thread {}", i);
    });
}
// メインスレッドが先に終了すると、子スレッドも強制終了

// ✅ 良い例: joinで完了を待つ
let handles: Vec<_> = (0..10)
    .map(|i| thread::spawn(move || println!("Thread {}", i)))
    .collect();

for handle in handles {
    handle.join().unwrap();
}

間違い 4: チャネルの送信側をdropし忘れる

// ❌ 悪い例
let (tx, rx) = mpsc::channel();

for i in 0..5 {
    let tx_clone = tx.clone();
    thread::spawn(move || tx_clone.send(i).unwrap());
}
// txがまだ生きている

for received in rx {  // ⚠️ 永遠に待ち続ける
    println!("{}", received);
}

// ✅ 良い例
let (tx, rx) = mpsc::channel();

for i in 0..5 {
    let tx_clone = tx.clone();
    thread::spawn(move || tx_clone.send(i).unwrap());
}
drop(tx);  // 元のtxをdrop

for received in rx {  // 正しく終了する
    println!("{}", received);
}

---

パフォーマンス比較とベンチマーク

シングルスレッド vs マルチスレッド

use std::thread;
use std::time::Instant;

fn compute_heavy(n: u64) -> u64 {
    (0..n).map(|x| x * x).sum()
}

fn main() {
    let n = 1_000_000;

    // シングルスレッド
    let start = Instant::now();
    let result1 = compute_heavy(n);
    let result2 = compute_heavy(n);
    let result3 = compute_heavy(n);
    let result4 = compute_heavy(n);
    let single_duration = start.elapsed();

    println!("Single-threaded: {:?}", single_duration);

    // マルチスレッド
    let start = Instant::now();
    let handles: Vec<_> = (0..4)
        .map(|_| thread::spawn(move || compute_heavy(n)))
        .collect();

    let results: Vec<_> = handles.into_iter()
        .map(|h| h.join().unwrap())
        .collect();
    let multi_duration = start.elapsed();

    println!("Multi-threaded: {:?}", multi_duration);
    println!("Speedup: {:.2}x", single_duration.as_secs_f64() / multi_duration.as_secs_f64());
}

典型的な結果(4コアCPU):

Single-threaded: 842ms
Multi-threaded: 235ms
Speedup: 3.58x

Mutex vs RwLock

use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Instant;

fn benchmark_mutex() -> std::time::Duration {
    let data = Arc::new(Mutex::new(0));
    let start = Instant::now();

    let handles: Vec<_> = (0..100)
        .map(|_| {
            let data = Arc::clone(&data);
            thread::spawn(move || {
                for _ in 0..1000 {
                    let _guard = data.lock().unwrap();
                    // 読み取りのみ
                }
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    start.elapsed()
}

fn benchmark_rwlock() -> std::time::Duration {
    let data = Arc::new(RwLock::new(0));
    let start = Instant::now();

    let handles: Vec<_> = (0..100)
        .map(|_| {
            let data = Arc::clone(&data);
            thread::spawn(move || {
                for _ in 0..1000 {
                    let _guard = data.read().unwrap();
                    // 読み取りのみ
                }
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    start.elapsed()
}

fn main() {
    println!("Mutex: {:?}", benchmark_mutex());
    println!("RwLock: {:?}", benchmark_rwlock());
}

典型的な結果:

Mutex: 245ms
RwLock: 89ms

読み取りが多い場合、RwLockは約2.7倍高速!

---

完全な実装例: 並行Webスクレイパー

use std::sync::{Arc, Mutex};
use std::thread;

fn fetch_url(url: &str) -> String {
    // 実際のHTTPリクエストの代わりにシミュレート
    thread::sleep(std::time::Duration::from_millis(100));
    format!("Content from {}", url)
}

fn main() {
    let urls = vec![
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
        "https://example.com/4",
        "https://example.com/5",
    ];

    // 結果を格納する共有ベクター
    let results = Arc::new(Mutex::new(Vec::new()));

    let handles: Vec<_> = urls
        .into_iter()
        .map(|url| {
            let results = Arc::clone(&results);
            thread::spawn(move || {
                println!("Fetching: {}", url);
                let content = fetch_url(url);

                // ロックを取得して結果を追加
                let mut results = results.lock().unwrap();
                results.push((url.to_string(), content));
            })
        })
        .collect();

    // 全スレッド完了を待つ
    for handle in handles {
        handle.join().unwrap();
    }

    // 結果を表示
    let results = results.lock().unwrap();
    for (url, content) in results.iter() {
        println!("{}: {}", url, content);
    }
}

---

まとめ

3つのExerciseを通じて、Rustの並行処理の基本を学びました:

  • スレッド生成: thread::spawnjoin
  • 共有状態: Arc>パターン
  • メッセージパッシング: mpsc::channel

これらは実際のプロダクションコードで頻繁に使われるパターンです!