第16章: 並行処理 (Concurrency)
学習目標
- スレッドの生成と管理を学ぶ
- Send / Sync トレイトを理解する
- チャネルによるメッセージパッシングを学ぶ
- 共有状態の並行処理を理解する
---
16.1 並行処理の基本
16.1.1 並行性と並列性
┌─────────────────────────────────────────────────────────┐
│ 並行性 (Concurrency) vs 並列性 (Parallelism) │
├─────────────────────────────────────────────────────────┤
│ │
│ 【並行性 (Concurrency)】 │
│ - 複数のタスクが進行中 │
│ - シングルコアでも可能 │
│ - タイムスライス、コンテキストスイッチ │
│ │
│ 【並列性 (Parallelism)】 │
│ - 複数のタスクが同時実行 │
│ - マルチコア必須 │
│ - 真の同時実行 │
│ │
└─────────────────────────────────────────────────────────┘
16.1.2 Rustの並行処理モデル
RustはFearless Concurrency(恐れなき並行性)を提供:
---
16.2 スレッドの基本
16.2.1 スレッドの生成
use std::thread;
use std::time::Duration;
fn main() {
// 新しいスレッドを生成
thread::spawn(|| {
for i in 1..10 {
println!("spawned thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// メインスレッド
for i in 1..5 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
}
問題点: メインスレッドが終了すると、スポーンしたスレッドも終了してしまう。
16.2.2 JoinHandle - スレッドの待機
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("spawned thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
// スレッドの終了を待つ
handle.join().unwrap();
}
16.2.3 move クロージャ
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// moveを使って所有権を移動
let handle = thread::spawn(move || {
println!("Vector: {:?}", v);
});
// println!("Vector: {:?}", v); // エラー!所有権が移動済み
handle.join().unwrap();
}
なぜmoveが必要?
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// moveなし:コンパイルエラー
// let handle = thread::spawn(|| {
// println!("{:?}", v); // vの参照が無効になる可能性
// });
// moveあり:OK
let handle = thread::spawn(move || {
println!("{:?}", v); // 所有権を移動
});
handle.join().unwrap();
}
---
16.3 Send と Sync トレイト
16.3.1 Send トレイト
Send は、スレッド間で所有権を転送できることを示すマーカートレイト。
use std::thread;
fn main() {
let v = vec![1, 2, 3]; // Vec<T>はSend
let handle = thread::spawn(move || {
println!("{:?}", v);
});
handle.join().unwrap();
}
Sendではない型:
use std::rc::Rc;
use std::thread;
fn main() {
let rc = Rc::new(5);
// エラー!RcはSendではない
// let handle = thread::spawn(move || {
// println!("{}", rc);
// });
}
16.3.2 Sync トレイト
Sync は、複数のスレッドから参照できることを示すマーカートレイト。
T is Sync ⟺ &T is Send
型Tが Sync ⟺ &T が Send
例:
use std::sync::Arc;
use std::thread;
fn main() {
let data = Arc::new(vec![1, 2, 3]); // Arc<T>はSync
let mut handles = vec![];
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("Thread {}: {:?}", i, data);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
16.3.3 Send / Sync の自動実装
┌─────────────────────────────────────────────────────────┐
│ Send / Sync の自動実装規則 │
├─────────────────────────────────────────────────────────┤
│ │
│ すべてのフィールドが Send → 構造体も Send │
│ すべてのフィールドが Sync → 構造体も Sync │
│ │
│ 【Sendではない型】 │
│ - Rc<T> (参照カウントが非スレッドセーフ) │
│ - RefCell<T> (借用チェックが非スレッドセーフ) │
│ │
│ 【Syncではない型】 │
│ - Cell<T> │
│ - RefCell<T> │
│ │
└─────────────────────────────────────────────────────────┘
---
16.4 メッセージパッシング
16.4.1 チャネルの基本
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("Hello from thread");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
16.4.2 複数のメッセージ
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
16.4.3 複数のプロデューサー
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send(String::from("Thread 1")).unwrap();
});
thread::spawn(move || {
tx.send(String::from("Thread 2")).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
}
16.4.4 所有権の移動
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("Hello");
tx.send(val).unwrap();
// println!("{}", val); // エラー!所有権が移動済み
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
---
16.5 共有状態の並行処理
16.5.1 Mutex - 相互排他
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
} // ロックが自動的に解放される
println!("m = {:?}", m); // Mutex { data: 6 }
}
16.5.2 複数スレッドでのMutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 10
}
16.5.3 デッドロックの危険性
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let m1 = Arc::new(Mutex::new(0));
let m2 = Arc::new(Mutex::new(0));
let m1_clone = Arc::clone(&m1);
let m2_clone = Arc::clone(&m2);
// スレッド1
thread::spawn(move || {
let _a = m1_clone.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(10));
let _b = m2_clone.lock().unwrap(); // デッドロック!
});
// スレッド2
thread::spawn(move || {
let _b = m2.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(10));
let _a = m1.lock().unwrap(); // デッドロック!
});
thread::sleep(std::time::Duration::from_secs(1));
}
解決策: ロックの順序を統一する。
---
16.6 実践例
16.6.1 並列マップ
use std::sync::{Arc, Mutex};
use std::thread;
fn parallel_map<F>(data: Vec<i32>, f: F) -> Vec<i32>
where
F: Fn(i32) -> i32 + Send + Sync + 'static,
{
let f = Arc::new(f);
let result = Arc::new(Mutex::new(vec![0; data.len()]));
let mut handles = vec![];
for (i, value) in data.into_iter().enumerate() {
let f = Arc::clone(&f);
let result = Arc::clone(&result);
let handle = thread::spawn(move || {
let transformed = f(value);
result.lock().unwrap()[i] = transformed;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
Arc::try_unwrap(result).unwrap().into_inner().unwrap()
}
fn main() {
let data = vec![1, 2, 3, 4, 5];
let result = parallel_map(data, |x| x * 2);
println!("{:?}", result); // [2, 4, 6, 8, 10]
}
16.6.2 プロデューサー・コンシューマーパターン
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// プロデューサー
let producer = thread::spawn(move || {
for i in 0..10 {
println!("Producing: {}", i);
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// コンシューマー
let consumer = thread::spawn(move || {
for received in rx {
println!("Consuming: {}", received);
thread::sleep(Duration::from_millis(150));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
16.6.3 ワーカープール
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} executing job", id);
job();
}
Err(_) => break,
}
});
Worker { id, thread }
}
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Executing job {}", i);
thread::sleep(std::time::Duration::from_secs(1));
});
}
thread::sleep(std::time::Duration::from_secs(5));
}
---
16.7 並行処理のベストプラクティス
16.7.1 メッセージパッシング vs 共有状態
┌─────────────────────────────────────────────────────────┐
│ メッセージパッシング vs 共有状態 │
├─────────────────────────────────────────────────────────┤
│ │
│ 【メッセージパッシング(チャネル)】 │
│ ✓ 所有権の明確な移動 │
│ ✓ デッドロックの心配なし │
│ ✓ Goの思想("Don't communicate by sharing memory") │
│ ✗ オーバーヘッドがやや大きい │
│ │
│ 【共有状態(Mutex/Arc)】 │
│ ✓ 直感的 │
│ ✓ 既存のデータ構造を再利用しやすい │
│ ✗ デッドロックの可能性 │
│ ✗ 複雑な同期が必要 │
│ │
└─────────────────────────────────────────────────────────┘
16.7.2 並行処理のパターン
// パターン1: 並列マップ(データ並列)
let results: Vec<_> = data
.into_iter()
.map(|x| thread::spawn(move || process(x)))
.collect::<Vec<_>>()
.into_iter()
.map(|h| h.join().unwrap())
.collect();
// パターン2: パイプライン(タスク並列)
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for x in data {
tx1.send(stage1(x)).unwrap();
}
});
thread::spawn(move || {
for x in rx1 {
tx2.send(stage2(x)).unwrap();
}
});
// パターン3: ワーカープール(タスクプール)
let pool = ThreadPool::new(num_cpus);
for task in tasks {
pool.execute(task);
}
---
16.8 まとめ
Rustの並行処理の強み
┌─────────────────────────────────────────────────────────┐
│ Rustの並行処理が安全な理由 │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. 所有権システム │
│ └─ データ競合をコンパイル時に防ぐ │
│ │
│ 2. Send / Sync トレイト │
│ └─ 型システムでスレッド安全性を保証 │
│ │
│ 3. ゼロコスト抽象化 │
│ └─ 安全性を保ちながら高速 │
│ │
│ 4. RAII │
│ └─ ロックの自動解放 │
│ │
└─────────────────────────────────────────────────────────┘
次のステップ
次の章では、非同期プログラミングについて学びます。async/await、Future、tokioなどの非同期ランタイムを理解します。
---