Day 5: 並行処理入門 - 背景知識

Fearless Concurrency - 恐れのない並行処理

Rustの並行処理は「Fearless Concurrency」という哲学のもとに設計されています。これは「コンパイル時に並行処理のバグを防げるため、実行時の恐れがない」という意味です。

並行処理の歴史と課題

1. 並行処理が必要な理由

現代のコンピュータはマルチコアプロセッサが標準です:

  • パフォーマンス: 複数のコアを活用して計算を高速化
  • レスポンシブ性: UIをブロックせずにバックグラウンド処理
  • スケーラビリティ: サーバーで数千の接続を同時処理
  • リソース効率: I/O待機中に他の作業を実行

シングルスレッド:
Task A ████████████ (12秒)
Task B             ████████████ (12秒)
合計: 24秒

マルチスレッド:
Task A ████████████
Task B ████████████
合計: 12秒

2. 従来の並行処理の問題点

多くのプログラミング言語では、並行処理は「難しく危険」とされてきました:

データ競合 (Data Race)

// C言語の例 - データ競合が発生
int counter = 0;

void* increment(void* arg) {
    for (int i = 0; i < 100000; i++) {
        counter++;  // ⚠️ 複数スレッドから同時アクセス
    }
    return NULL;
}

// 2つのスレッドで実行すると、
// 期待値: 200000
// 実際: 147382 (実行ごとに異なる!)

デッドロック (Deadlock)

// Javaの例 - デッドロックが発生
Object lock1 = new Object();
Object lock2 = new Object();

Thread t1 = new Thread(() -> {
    synchronized(lock1) {
        Thread.sleep(10);
        synchronized(lock2) { /* 作業 */ }
    }
});

Thread t2 = new Thread(() -> {
    synchronized(lock2) {  // lock2を先に取得
        Thread.sleep(10);
        synchronized(lock1) { /* 作業 */ }  // ⚠️ デッドロック
    }
});

競合状態 (Race Condition)

# Pythonの例 - 競合状態
balance = 1000

def withdraw(amount):
    global balance
    if balance >= amount:
        # ⚠️ ここで他のスレッドが実行される可能性
        balance -= amount
        return True
    return False

# 2つのスレッドで同時に900円引き出すと、
# 残高がマイナスになる可能性がある

3. Rustの解決策

Rustは型システムと所有権システムで、これらの問題をコンパイル時に検出します:

// ❌ コンパイルエラー!
let mut data = vec![1, 2, 3];

std::thread::spawn(|| {
    data.push(4);  // エラー: dataの所有権がない
});

println!("{:?}", data);

error[E0373]: closure may outlive the current function
  --> src/main.rs:4:23
   |
4  |     std::thread::spawn(|| {
   |                        ^^ may outlive borrowed value `data`
5  |         data.push(4);
   |         ---- `data` is borrowed here

このエラーメッセージは「スレッドがdataより長生きする可能性がある」と教えてくれます。

Rustの並行処理の仕組み

1. Send と Sync トレイト

Rustは2つの特別なマーカートレイトで並行処理の安全性を保証します:

Send トレイト

定義: スレッド間で所有権を移動できる型

// Send を実装している型
let data = vec![1, 2, 3];  // Vec<i32> は Send
std::thread::spawn(move || {
    println!("{:?}", data);  // ✅ OK: 所有権を移動
});

// Send を実装していない型
let rc = Rc::new(5);  // Rc<i32> は Send ではない
std::thread::spawn(move || {
    println!("{}", rc);  // ❌ コンパイルエラー
});

なぜRcはSendではない?

メインスレッド:          子スレッド:
Rc { count: 2 } -----> Rc { count: 2 }
     ↓                      ↓
  count--                count--
     ↓                      ↓
  count = ?  ⚠️ データ競合!

Rcの参照カウントはスレッドセーフではないため、複数スレッドで共有できません。

Sync トレイト

定義: 複数のスレッドから安全に参照できる型

// T が Sync ⇔ &T が Send

// i32 は Sync
let x = 42;
std::thread::spawn(|| {
    println!("{}", x);  // ✅ OK: i32 はコピー可能
});

// Cell<i32> は Sync ではない
use std::cell::Cell;
let cell = Cell::new(42);
std::thread::spawn(|| {
    cell.set(10);  // ❌ コンパイルエラー
});

主要な型のSend/Sync実装

Send Sync 理由
`i32`, `String` イミュータブルで安全
`Vec` ✅ (Tが Send) ✅ (Tが Sync) 所有権ベース
`Rc` 参照カウントが非アトミック
`Arc` ✅ (Tが Send+Sync) ✅ (Tが Sync) アトミックな参照カウント
`Cell` ✅ (Tが Send) 内部可変性が非同期
`Mutex` ✅ (Tが Send) ✅ (Tが Send) ロックで保護

2. 所有権による安全性

Rustの所有権システムは、並行処理でも機能します:

let data = vec![1, 2, 3];

// moveキーワードで所有権を移動
let handle = std::thread::spawn(move || {
    println!("{:?}", data);  // dataの所有権はこのスレッドに
});

// println!("{:?}", data);  // ❌ エラー: dataはすでに移動済み

handle.join().unwrap();

所有権の移動とライフタイム

メインスレッド:              子スレッド:
┌──────────┐
│ data     │ ──move──>    ┌──────────┐
│ (所有者)  │              │ data     │
└──────────┘              │ (新所有者)│
    ×                     └──────────┘
  使用不可                   ✓ 使用可

Rustの並行処理パターン

1. スレッド (Threads)

基本的なスレッド生成

use std::thread;
use std::time::Duration;

fn main() {
    // スレッドを生成
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("子スレッド: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    // メインスレッドも並行して実行
    for i in 1..5 {
        println!("メインスレッド: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    // 子スレッドの完了を待つ
    handle.join().unwrap();
}

出力例:

メインスレッド: 1
子スレッド: 1
メインスレッド: 2
子スレッド: 2
子スレッド: 3
メインスレッド: 3
...

JoinHandle の重要性

// ❌ 悪い例: スレッドを待たない
thread::spawn(|| {
    println!("この出力は表示されないかも");
});
// メインスレッドが先に終了すると、子スレッドも強制終了

// ✅ 良い例: joinで完了を待つ
let handle = thread::spawn(|| {
    println!("この出力は必ず表示される");
});
handle.join().unwrap();

スレッドへのデータ渡し

let numbers = vec![1, 2, 3];

let handle = thread::spawn(move || {
    let sum: i32 = numbers.iter().sum();
    println!("合計: {}", sum);
});

handle.join().unwrap();

2. Arc (Atomic Reference Counted)

問題: 複数のスレッドで同じデータを読みたい

// ❌ これはコンパイルエラー
let data = vec![1, 2, 3];

let handle1 = thread::spawn(move || println!("{:?}", data));
let handle2 = thread::spawn(move || println!("{:?}", data)); // エラー: dataは既に移動済み

解決策: Arcを使う

use std::sync::Arc;

let data = Arc::new(vec![1, 2, 3]);

let data1 = Arc::clone(&data);  // 参照カウントを増やす
let handle1 = thread::spawn(move || {
    println!("スレッド1: {:?}", data1);
});

let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
    println!("スレッド2: {:?}", data2);
});

println!("メイン: {:?}", data);

handle1.join().unwrap();
handle2.join().unwrap();

Arcの仕組み

初期状態:
Arc { data: [1,2,3], count: 1 }

Arc::clone後:
Arc { data: [1,2,3], count: 3 }
 ↓         ↓         ↓
data     data1     data2

drop後:
count: 3 → 2 → 1 → 0 (メモリ解放)

RcとArcの違い

use std::rc::Rc;
use std::sync::Arc;

// Rc: シングルスレッド用(高速)
let rc = Rc::new(5);
let rc_clone = Rc::clone(&rc);
// 参照カウントの増減は通常の加算/減算

// Arc: マルチスレッド用(やや遅い)
let arc = Arc::new(5);
let arc_clone = Arc::clone(&arc);
// 参照カウントの増減はアトミック操作(CPU命令)

3. Mutex (Mutual Exclusion)

問題: 複数のスレッドで同じデータを書き換えたい

// ❌ Arcだけでは書き換えられない
let data = Arc::new(vec![1, 2, 3]);
let data1 = Arc::clone(&data);

thread::spawn(move || {
    data1.push(4);  // エラー: Arc<T>はイミュータブル
});

解決策: Arc>パターン

use std::sync::{Arc, Mutex};

let data = Arc::new(Mutex::new(vec![1, 2, 3]));

let data1 = Arc::clone(&data);
let handle1 = thread::spawn(move || {
    let mut vec = data1.lock().unwrap();  // ロックを取得
    vec.push(4);
    // ロックは自動的に解放(Drop)
});

let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
    let mut vec = data2.lock().unwrap();
    vec.push(5);
});

handle1.join().unwrap();
handle2.join().unwrap();

println!("{:?}", data.lock().unwrap());  // [1, 2, 3, 4, 5]

Mutexのロックメカニズム

スレッド1              Mutex                スレッド2
   │                    │                      │
   ├─ lock() ──────>   ロック取得              │
   │                   (所有権獲得)            │
   │                    │                      │
   │                    │              <── lock() (待機)
   ├─ vec.push(4)       │                      │
   │                    │                      │
   ├─ drop(guard) ──>  ロック解放              │
   │                    │                      │
   │                    │              <── ロック取得
   │                    │                      │
   │                    │                  vec.push(5)

Mutexの注意点

// ⚠️ デッドロックの例
let data = Arc::new(Mutex::new(0));

let guard1 = data.lock().unwrap();  // ロックを取得
let guard2 = data.lock().unwrap();  // ⚠️ 同じスレッドで再度ロック取得 → デッドロック

// ✅ スコープでロックを制限
{
    let mut guard = data.lock().unwrap();
    *guard += 1;
}  // ここでロックが解放される

4. チャネル (Channels)

哲学: 「メモリを共有するのではなく、メッセージを共有せよ」

use std::sync::mpsc;  // multiple producer, single consumer
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let message = String::from("Hello from thread!");
        tx.send(message).unwrap();
        // messageはsendで移動されたので、以降使用不可
    });

    let received = rx.recv().unwrap();
    println!("受信: {}", received);
}

チャネルの種類

// 1. 非同期チャネル(バッファなし)
let (tx, rx) = mpsc::channel();
tx.send(1).unwrap();  // すぐに成功

// 2. 同期チャネル(バッファサイズ指定)
let (tx, rx) = mpsc::sync_channel(0);  // バッファサイズ0
thread::spawn(move || {
    tx.send(1).unwrap();  // 受信されるまでブロック
});
thread::sleep(Duration::from_secs(1));
println!("{}", rx.recv().unwrap());

複数の送信者

let (tx, rx) = mpsc::channel();

for i in 0..5 {
    let tx_clone = tx.clone();
    thread::spawn(move || {
        tx_clone.send(i).unwrap();
    });
}
drop(tx);  // 元のtxをdrop(重要!)

for received in rx {
    println!("受信: {}", received);
}

チャネル vs 共有メモリ

パターン メリット デメリット 適用例
チャネル ・データ競合なし
・疎結合
・コピーのオーバーヘッド ・プロデューサー/コンシューマー
・パイプライン処理
Arc+Mutex ・メモリ効率
・直接アクセス
・デッドロックのリスク
・密結合
・共有状態
・頻繁な読み書き

実世界での活用事例

1. Discord - Rustで5億5000万ユーザーの負荷を処理

背景: DiscordはGoで実装されていたメッセージルーティングサービスをRustに書き換えました。

課題: ガベージコレクション(GC)による2分ごとのレイテンシスパイク

Go実装:
レイテンシ: ────────┃────────┃────────┃
            (通常)  GC      GC      GC
                  (遅延)  (遅延)  (遅延)

Rust実装:
レイテンシ: ─────────────────────────────
            (常に低レイテンシ)

成果:

  • レイテンシの安定性向上
  • メモリ使用量が10%削減
  • CPU使用率が安定

使用技術: tokio(非同期ランタイム)、Arc、チャネル

参考: Why Discord is switching from Go to Rust

2. Cloudflare - 1日4兆リクエストをRustで処理

背景: CDNプロキシサーバーをCからRustに置き換え

Rustを選んだ理由:

  • メモリ安全性(Cのバッファオーバーフロー脆弱性を排除)
  • 並行処理の安全性
  • パフォーマンス(Cと同等)

アーキテクチャ:

リクエスト → [Rustプロキシ] ──┬── オリジンサーバー1
  4兆/日         ↓            ├── オリジンサーバー2
               Arc<Config>    └── オリジンサーバーN
                  ↓
            複数のワーカースレッド

参考: Cloudflare's Rust-based Pingora

3. AWS Lambda - Firecracker VMM

Firecracker: AWSのサーバーレス(Lambda, Fargate)を支えるマイクロVM

Rustが必要な理由:

  • セキュリティ: 数千のVMを1台のホストで実行
  • パフォーマンス: 125ms以下の起動時間
  • 並行処理: 複数のVMを安全に管理

並行処理の実装:

// 各VMは独立したスレッドで実行
let vm1 = Arc::new(Mutex::new(VirtualMachine::new()));
let vm2 = Arc::new(Mutex::new(VirtualMachine::new()));

thread::spawn(move || vm1.lock().unwrap().run());
thread::spawn(move || vm2.lock().unwrap().run());

参考: Firecracker design

4. Figma - デスクトップアプリのマルチスレッド処理

背景: Figmaのデスクトップアプリは大きなデザインファイルを処理

Rustの活用:

// ファイル読み込みを並行処理
let files = vec!["layer1.fig", "layer2.fig", "layer3.fig"];
let handles: Vec<_> = files.into_iter()
    .map(|file| {
        thread::spawn(move || {
            load_layer(file)
        })
    })
    .collect();

let layers: Vec<_> = handles.into_iter()
    .map(|h| h.join().unwrap())
    .collect();

成果: 10倍のパフォーマンス向上

参考: Rust in Production at Figma

5. Dropbox - 並行ファイル同期エンジン

Nucleus: Dropboxの新しい同期エンジン

並行処理の設計:

// ファイル変更の監視とアップロードを並行実行
let (file_tx, file_rx) = mpsc::channel();

// 監視スレッド
thread::spawn(move || {
    for event in file_watcher {
        file_tx.send(event).unwrap();
    }
});

// アップロードワーカー(複数)
for _ in 0..4 {
    let rx = file_rx.clone();
    thread::spawn(move || {
        for file in rx {
            upload_file(file);
        }
    });
}

参考: Rewriting Dropbox sync engine in Rust

非同期プログラミング(async/await)

スレッド vs 非同期

// スレッドベース: OSスレッドを使用(重い)
for i in 0..1000 {
    thread::spawn(move || {
        // 各スレッドは約2MBのスタックメモリ
    });
}
// 合計: 2GB以上のメモリ使用

// 非同期ベース: タスク(軽量)
for i in 0..1000 {
    tokio::spawn(async move {
        // タスクは数KBのメモリ
    });
}
// 合計: 数MB程度のメモリ使用

Tokioランタイム

use tokio;

#[tokio::main]
async fn main() {
    // 非同期関数を並行実行
    let task1 = tokio::spawn(async {
        // I/O処理など
        tokio::time::sleep(Duration::from_secs(1)).await;
        "task1 完了"
    });

    let task2 = tokio::spawn(async {
        tokio::time::sleep(Duration::from_secs(1)).await;
        "task2 完了"
    });

    let (result1, result2) = tokio::join!(task1, task2);
    println!("{:?}, {:?}", result1, result2);
}

非同期が適している場面:

  • I/O待ちが多い(ネットワーク、ファイル)
  • 大量の並行接続(Webサーバー)
  • メモリ効率が重要

スレッドが適している場面:

  • CPU集約的な処理
  • ブロッキングAPIの使用
  • シンプルな並行処理

メンタルモデル:並行処理の考え方

1. 所有権ベースの並行処理

従来の言語:
スレッド1 ──→ データ ←── スレッド2
            (競合の可能性)

Rust:
スレッド1 ──→ データ(所有)
スレッド2 ──→ 別のデータ(所有)
または
スレッド1 ──→ Arc ←── スレッド2
            (参照カウント)

2. チャネルとメッセージパッシング

プロデューサー          コンシューマー
     │                      │
     ├── メッセージ1 ──→   受信
     ├── メッセージ2 ──→   受信
     └── メッセージ3 ──→   受信

3. Arc + Mutex の階層

Arc: 共有所有権
  └─ Mutex: 排他制御
       └─ データ: 実際の値

よくある質問

Q1: ArcとMutexは常にセットで使う?

A: いいえ。読み取り専用ならArcだけで十分です:

// 読み取り専用 - Arcのみ
let config = Arc::new(Config { port: 8080 });

// 書き込みあり - Arc + Mutex
let counter = Arc::new(Mutex::new(0));

Q2: RwLock とは?

A: 読み取りロックと書き込みロックを分離:

use std::sync::RwLock;

let data = Arc::new(RwLock::new(vec![1, 2, 3]));

// 複数の読み取りは並行OK
let reader1 = data.read().unwrap();
let reader2 = data.read().unwrap();

// 書き込みは排他的
let mut writer = data.write().unwrap();
writer.push(4);

使い分け:

  • 読み取りが多い → RwLock
  • 書き込みが多い → Mutex(シンプル)
  • Q3: スレッドプールは?

    A: rayonクレートが便利です:

    use rayon::prelude::*;
    
    let sum: i32 = (0..1000)
        .into_par_iter()  // 並列イテレータ
        .map(|x| x * x)
        .sum();
    

    学習チェックリスト

    Day 5の課題に進む前に、以下を理解できているか確認しましょう:

  • [ ] SendとSyncトレイトの違いを説明できる
  • [ ] RcとArcの違いと使い分けができる
  • [ ] Arc::cloneが何をしているか理解している
  • [ ] Mutexのlock()が返す値の型を知っている
  • [ ] デッドロックが起きる原因を説明できる
  • [ ] チャネルでの所有権の移動を理解している
  • [ ] スレッドと非同期の使い分けができる

参考資料

公式ドキュメント

クレート

  • tokio - 非同期ランタイム
  • rayon - データ並列処理
  • crossbeam - 並行処理ユーティリティ

ブログ記事

Fearless Concurrency の世界へようこそ!