第16章: 並行処理 (Concurrency)

学習目標

  • スレッドの生成と管理を学ぶ
  • Send / Sync トレイトを理解する
  • チャネルによるメッセージパッシングを学ぶ
  • 共有状態の並行処理を理解する
  • ---

    16.1 並行処理の基本

    16.1.1 並行性と並列性

    ┌─────────────────────────────────────────────────────────┐
    │          並行性 (Concurrency) vs 並列性 (Parallelism)   │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  【並行性 (Concurrency)】                                │
    │  - 複数のタスクが進行中                                  │
    │  - シングルコアでも可能                                  │
    │  - タイムスライス、コンテキストスイッチ                  │
    │                                                         │
    │  【並列性 (Parallelism)】                                │
    │  - 複数のタスクが同時実行                                │
    │  - マルチコア必須                                        │
    │  - 真の同時実行                                          │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    16.1.2 Rustの並行処理モデル

    RustはFearless Concurrency(恐れなき並行性)を提供:

  • コンパイル時にデータ競合を検出
  • 型システムでスレッド安全性を保証
  • 所有権システムで安全な並行処理を実現
  • ---

    16.2 スレッドの基本

    16.2.1 スレッドの生成

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        // 新しいスレッドを生成
        thread::spawn(|| {
            for i in 1..10 {
                println!("spawned thread: {}", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        // メインスレッド
        for i in 1..5 {
            println!("main thread: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    }
    

    問題点: メインスレッドが終了すると、スポーンしたスレッドも終了してしまう。

    16.2.2 JoinHandle - スレッドの待機

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let handle = thread::spawn(|| {
            for i in 1..10 {
                println!("spawned thread: {}", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("main thread: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    
        // スレッドの終了を待つ
        handle.join().unwrap();
    }
    

    16.2.3 move クロージャ

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        // moveを使って所有権を移動
        let handle = thread::spawn(move || {
            println!("Vector: {:?}", v);
        });
    
        // println!("Vector: {:?}", v);  // エラー!所有権が移動済み
    
        handle.join().unwrap();
    }
    

    なぜmoveが必要?

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        // moveなし:コンパイルエラー
        // let handle = thread::spawn(|| {
        //     println!("{:?}", v);  // vの参照が無効になる可能性
        // });
    
        // moveあり:OK
        let handle = thread::spawn(move || {
            println!("{:?}", v);  // 所有権を移動
        });
    
        handle.join().unwrap();
    }
    

    ---

    16.3 Send と Sync トレイト

    16.3.1 Send トレイト

    Send は、スレッド間で所有権を転送できることを示すマーカートレイト。

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];  // Vec<T>はSend
    
        let handle = thread::spawn(move || {
            println!("{:?}", v);
        });
    
        handle.join().unwrap();
    }
    

    Sendではない型:

    use std::rc::Rc;
    use std::thread;
    
    fn main() {
        let rc = Rc::new(5);
    
        // エラー!RcはSendではない
        // let handle = thread::spawn(move || {
        //     println!("{}", rc);
        // });
    }
    

    16.3.2 Sync トレイト

    Sync は、複数のスレッドから参照できることを示すマーカートレイト。

    T is Sync ⟺ &T is Send
    
    型Tが Sync ⟺ &T が Send
    

    :

    use std::sync::Arc;
    use std::thread;
    
    fn main() {
        let data = Arc::new(vec![1, 2, 3]);  // Arc<T>はSync
    
        let mut handles = vec![];
    
        for i in 0..3 {
            let data = Arc::clone(&data);
            let handle = thread::spawn(move || {
                println!("Thread {}: {:?}", i, data);
            });
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    }
    

    16.3.3 Send / Sync の自動実装

    ┌─────────────────────────────────────────────────────────┐
    │              Send / Sync の自動実装規則                  │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  すべてのフィールドが Send → 構造体も Send               │
    │  すべてのフィールドが Sync → 構造体も Sync               │
    │                                                         │
    │  【Sendではない型】                                      │
    │  - Rc<T>       (参照カウントが非スレッドセーフ)          │
    │  - RefCell<T>  (借用チェックが非スレッドセーフ)          │
    │                                                         │
    │  【Syncではない型】                                      │
    │  - Cell<T>                                              │
    │  - RefCell<T>                                           │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    ---

    16.4 メッセージパッシング

    16.4.1 チャネルの基本

    use std::sync::mpsc;  // multiple producer, single consumer
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("Hello from thread");
            tx.send(val).unwrap();
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    

    16.4.2 複数のメッセージ

    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
    
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    }
    

    16.4.3 複数のプロデューサー

    use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        let tx1 = tx.clone();
        thread::spawn(move || {
            tx1.send(String::from("Thread 1")).unwrap();
        });
    
        thread::spawn(move || {
            tx.send(String::from("Thread 2")).unwrap();
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    }
    

    16.4.4 所有権の移動

    use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("Hello");
            tx.send(val).unwrap();
            // println!("{}", val);  // エラー!所有権が移動済み
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    

    ---

    16.5 共有状態の並行処理

    16.5.1 Mutex - 相互排他

    use std::sync::Mutex;
    
    fn main() {
        let m = Mutex::new(5);
    
        {
            let mut num = m.lock().unwrap();
            *num = 6;
        }  // ロックが自動的に解放される
    
        println!("m = {:?}", m);  // Mutex { data: 6 }
    }
    

    16.5.2 複数スレッドでのMutex

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let counter = Arc::new(Mutex::new(0));
        let mut handles = vec![];
    
        for _ in 0..10 {
            let counter = Arc::clone(&counter);
            let handle = thread::spawn(move || {
                let mut num = counter.lock().unwrap();
                *num += 1;
            });
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *counter.lock().unwrap());  // 10
    }
    

    16.5.3 デッドロックの危険性

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let m1 = Arc::new(Mutex::new(0));
        let m2 = Arc::new(Mutex::new(0));
    
        let m1_clone = Arc::clone(&m1);
        let m2_clone = Arc::clone(&m2);
    
        // スレッド1
        thread::spawn(move || {
            let _a = m1_clone.lock().unwrap();
            thread::sleep(std::time::Duration::from_millis(10));
            let _b = m2_clone.lock().unwrap();  // デッドロック!
        });
    
        // スレッド2
        thread::spawn(move || {
            let _b = m2.lock().unwrap();
            thread::sleep(std::time::Duration::from_millis(10));
            let _a = m1.lock().unwrap();  // デッドロック!
        });
    
        thread::sleep(std::time::Duration::from_secs(1));
    }
    

    解決策: ロックの順序を統一する。

    ---

    16.6 実践例

    16.6.1 並列マップ

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn parallel_map<F>(data: Vec<i32>, f: F) -> Vec<i32>
    where
        F: Fn(i32) -> i32 + Send + Sync + 'static,
    {
        let f = Arc::new(f);
        let result = Arc::new(Mutex::new(vec![0; data.len()]));
        let mut handles = vec![];
    
        for (i, value) in data.into_iter().enumerate() {
            let f = Arc::clone(&f);
            let result = Arc::clone(&result);
    
            let handle = thread::spawn(move || {
                let transformed = f(value);
                result.lock().unwrap()[i] = transformed;
            });
    
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        Arc::try_unwrap(result).unwrap().into_inner().unwrap()
    }
    
    fn main() {
        let data = vec![1, 2, 3, 4, 5];
        let result = parallel_map(data, |x| x * 2);
        println!("{:?}", result);  // [2, 4, 6, 8, 10]
    }
    

    16.6.2 プロデューサー・コンシューマーパターン

    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        // プロデューサー
        let producer = thread::spawn(move || {
            for i in 0..10 {
                println!("Producing: {}", i);
                tx.send(i).unwrap();
                thread::sleep(Duration::from_millis(100));
            }
        });
    
        // コンシューマー
        let consumer = thread::spawn(move || {
            for received in rx {
                println!("Consuming: {}", received);
                thread::sleep(Duration::from_millis(150));
            }
        });
    
        producer.join().unwrap();
        consumer.join().unwrap();
    }
    

    16.6.3 ワーカープール

    use std::sync::mpsc;
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    struct Worker {
        id: usize,
        thread: thread::JoinHandle<()>,
    }
    
    impl Worker {
        fn new(
            id: usize,
            receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
        ) -> Worker {
            let thread = thread::spawn(move || loop {
                let job = receiver.lock().unwrap().recv();
    
                match job {
                    Ok(job) => {
                        println!("Worker {} executing job", id);
                        job();
                    }
                    Err(_) => break,
                }
            });
    
            Worker { id, thread }
        }
    }
    
    type Job = Box<dyn FnOnce() + Send + 'static>;
    
    struct ThreadPool {
        workers: Vec<Worker>,
        sender: mpsc::Sender<Job>,
    }
    
    impl ThreadPool {
        fn new(size: usize) -> ThreadPool {
            let (sender, receiver) = mpsc::channel();
            let receiver = Arc::new(Mutex::new(receiver));
    
            let mut workers = Vec::with_capacity(size);
            for id in 0..size {
                workers.push(Worker::new(id, Arc::clone(&receiver)));
            }
    
            ThreadPool { workers, sender }
        }
    
        fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static,
        {
            let job = Box::new(f);
            self.sender.send(job).unwrap();
        }
    }
    
    fn main() {
        let pool = ThreadPool::new(4);
    
        for i in 0..8 {
            pool.execute(move || {
                println!("Executing job {}", i);
                thread::sleep(std::time::Duration::from_secs(1));
            });
        }
    
        thread::sleep(std::time::Duration::from_secs(5));
    }
    

    ---

    16.7 並行処理のベストプラクティス

    16.7.1 メッセージパッシング vs 共有状態

    ┌─────────────────────────────────────────────────────────┐
    │          メッセージパッシング vs 共有状態                │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  【メッセージパッシング(チャネル)】                    │
    │  ✓ 所有権の明確な移動                                    │
    │  ✓ デッドロックの心配なし                                │
    │  ✓ Goの思想("Don't communicate by sharing memory")    │
    │  ✗ オーバーヘッドがやや大きい                            │
    │                                                         │
    │  【共有状態(Mutex/Arc)】                               │
    │  ✓ 直感的                                                │
    │  ✓ 既存のデータ構造を再利用しやすい                      │
    │  ✗ デッドロックの可能性                                  │
    │  ✗ 複雑な同期が必要                                      │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    16.7.2 並行処理のパターン

    // パターン1: 並列マップ(データ並列)
    let results: Vec<_> = data
        .into_iter()
        .map(|x| thread::spawn(move || process(x)))
        .collect::<Vec<_>>()
        .into_iter()
        .map(|h| h.join().unwrap())
        .collect();
    
    // パターン2: パイプライン(タスク並列)
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();
    
    thread::spawn(move || {
        for x in data {
            tx1.send(stage1(x)).unwrap();
        }
    });
    
    thread::spawn(move || {
        for x in rx1 {
            tx2.send(stage2(x)).unwrap();
        }
    });
    
    // パターン3: ワーカープール(タスクプール)
    let pool = ThreadPool::new(num_cpus);
    for task in tasks {
        pool.execute(task);
    }
    

    ---

    16.8 まとめ

    Rustの並行処理の強み

    ┌─────────────────────────────────────────────────────────┐
    │          Rustの並行処理が安全な理由                      │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  1. 所有権システム                                       │
    │     └─ データ競合をコンパイル時に防ぐ                   │
    │                                                         │
    │  2. Send / Sync トレイト                                 │
    │     └─ 型システムでスレッド安全性を保証                  │
    │                                                         │
    │  3. ゼロコスト抽象化                                     │
    │     └─ 安全性を保ちながら高速                            │
    │                                                         │
    │  4. RAII                                                 │
    │     └─ ロックの自動解放                                  │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    次のステップ

    次の章では、非同期プログラミングについて学びます。async/await、Future、tokioなどの非同期ランタイムを理解します。

    ---

    参考資料

  • The Rust Book: Concurrency
  • Rust by Example: Threads
  • std::thread documentation