課題16: 並行処理 (Concurrency)

マンダトリー要件

問題1: 並列データ処理(30点)

以下の仕様に従って並列データ処理を実装しなさい。

use std::thread;
use std::sync::{Arc, Mutex};

// 大きな配列を並列に処理して合計を計算
fn parallel_sum(data: Vec<i32>, num_threads: usize) -> i32 {
    // TODO: 実装 (30点)
    // データを分割して各スレッドで処理
    // 結果をマージして返す
}

fn main() {
    let data: Vec<i32> = (1..=1000).collect();

    // シングルスレッド
    let expected: i32 = data.iter().sum();

    // マルチスレッド
    let result = parallel_sum(data.clone(), 4);

    assert_eq!(result, expected);
    println!("Problem 1 passed! Sum = {}", result);
}

評価基準:

  • データの分割: 10点
  • スレッド生成と実行: 10点
  • 結果のマージ: 10点

---

問題2: プロデューサー・コンシューマー(30点)

チャネルを使ってプロデューサー・コンシューマーパターンを実装しなさい。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

struct Message {
    id: usize,
    content: String,
}

fn producer(tx: mpsc::Sender<Message>, id: usize, count: usize) {
    // TODO: 実装 (10点)
    // count個のメッセージを送信
    // 各メッセージは100msの間隔で送信
}

fn consumer(rx: mpsc::Receiver<Message>) -> Vec<Message> {
    // TODO: 実装 (10点)
    // すべてのメッセージを受信して返す
}

fn main() {
    let (tx, rx) = mpsc::channel();

    // 複数のプロデューサーを生成 (10点)
    // TODO: 3つのプロデューサースレッドを起動
    // 各プロデューサーは5つのメッセージを送信

    // コンシューマー
    let consumer_handle = thread::spawn(move || {
        consumer(rx)
    });

    let messages = consumer_handle.join().unwrap();
    println!("Received {} messages", messages.len());
    assert_eq!(messages.len(), 15);  // 3 producers * 5 messages

    println!("Problem 2 passed!");
}

評価基準:

  • プロデューサーの実装: 10点
  • コンシューマーの実装: 10点
  • 複数プロデューサーの管理: 10点

---

問題3: スレッドセーフなキャッシュ(20点)

Arc>を使ってスレッドセーフなキャッシュを実装しなさい。

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

struct Cache {
    data: Arc<Mutex<HashMap<String, i32>>>,
}

impl Cache {
    fn new() -> Self {
        // TODO: 実装 (5点)
    }

    fn get(&self, key: &str) -> Option<i32> {
        // TODO: 実装 (5点)
    }

    fn set(&self, key: String, value: i32) {
        // TODO: 実装 (5点)
    }

    fn clone_cache(&self) -> Cache {
        // TODO: 実装 (5点)
    }
}

fn main() {
    let cache = Cache::new();
    let mut handles = vec![];

    // 複数のスレッドからキャッシュにアクセス
    for i in 0..10 {
        let cache = cache.clone_cache();
        let handle = thread::spawn(move || {
            cache.set(format!("key{}", i), i as i32);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // 結果を確認
    for i in 0..10 {
        assert_eq!(cache.get(&format!("key{}", i)), Some(i as i32));
    }

    println!("Problem 3 passed!");
}

評価基準:

  • newの実装: 5点
  • getの実装: 5点
  • setの実装: 5点
  • clone_cacheの実装: 5点

---

ボーナス課題

ボーナス1: ワーカープール(10点)

シンプルなワーカープールを実装しなさい。

use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;

type Job = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        // TODO: 実装
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        // TODO: 実装
    }
}

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        // TODO: 実装
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..10 {
        pool.execute(move || {
            println!("Job {} executing", i);
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }

    thread::sleep(std::time::Duration::from_secs(2));
    println!("Bonus 1 passed!");
}

評価基準:

  • ThreadPool::newの実装: 3点
  • Worker::newの実装: 4点
  • executeの実装: 3点

---

ボーナス2: 並列クイックソート(10点)

並列版クイックソートを実装しなさい。

use std::thread;

fn parallel_quicksort(mut data: Vec<i32>) -> Vec<i32> {
    // TODO: 実装
    // 基準値より大きい/小さいデータを別スレッドでソート
    // 深さ制限を設けてスレッド数を制御すること
}

fn main() {
    let mut data: Vec<i32> = (1..=100).rev().collect();

    let sorted = parallel_quicksort(data);

    let expected: Vec<i32> = (1..=100).collect();
    assert_eq!(sorted, expected);

    println!("Bonus 2 passed!");
}

評価基準:

  • 基本的なクイックソート: 5点
  • 並列化の実装: 5点

---

ボーナス3: レート リミッター(10点)

一定時間内のリクエスト数を制限する仕組みを実装しなさい。

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

struct RateLimiter {
    max_requests: usize,
    window: Duration,
    requests: Arc<Mutex<Vec<Instant>>>,
}

impl RateLimiter {
    fn new(max_requests: usize, window: Duration) -> Self {
        // TODO: 実装
    }

    fn allow(&self) -> bool {
        // TODO: 実装
        // 時間窓内のリクエスト数をチェック
        // 許可ならtrue、制限超過ならfalse
    }

    fn clone_limiter(&self) -> RateLimiter {
        // TODO: 実装
    }
}

fn main() {
    let limiter = RateLimiter::new(5, Duration::from_secs(1));
    let mut handles = vec![];

    for i in 0..10 {
        let limiter = limiter.clone_limiter();
        let handle = thread::spawn(move || {
            if limiter.allow() {
                println!("Request {} allowed", i);
            } else {
                println!("Request {} rate limited", i);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Bonus 3 passed!");
}

評価基準:

  • 基本的な実装: 5点
  • 古いリクエストの削除: 5点
  • ---

    評価基準

    マンダトリー部分(80点)

    項目 配点 評価ポイント
    問題1:並列データ処理 30点 スレッド分割と結果マージ
    問題2:メッセージパッシング 30点 チャネルの正しい使用
    問題3:スレッドセーフキャッシュ 20点 Mutex/Arcの理解

    ボーナス部分(20点)

    項目 配点 評価ポイント
    ボーナス1:ワーカープール 10点 実用的なパターン
    ボーナス2:並列ソート 10点 再帰的な並列化
    ボーナス3:レートリミッター 10点 時間制御の実装

    : ボーナスは最大20点まで加算されます。

    ---

    提出方法

    ファイル構成

    rust-foundations-16/
    ├── src/
    │   ├── problem1.rs
    │   ├── problem2.rs
    │   ├── problem3.rs
    │   ├── bonus1.rs       # オプション
    │   ├── bonus2.rs       # オプション
    │   └── bonus3.rs       # オプション
    ├── Cargo.toml
    └── README.md
    

    テスト

    cargo test --release
    

    提出期限

  • マンダトリー:第16章学習後、1週間以内
  • ボーナス:第18章修了時まで
  • ---

    ヒント

    問題1のヒント

    let chunk_size = data.len() / num_threads;
    let chunks: Vec<Vec<i32>> = data
        .chunks(chunk_size)
        .map(|chunk| chunk.to_vec())
        .collect();
    
    let results = Arc::new(Mutex::new(vec![]));
    

    問題2のヒント

    // プロデューサー
    for i in 0..count {
        let msg = Message {
            id: producer_id * 1000 + i,
            content: format!("Message from producer {}", producer_id),
        };
        tx.send(msg).unwrap();
        thread::sleep(Duration::from_millis(100));
    }
    

    問題3のヒント

    fn get(&self, key: &str) -> Option<i32> {
        let data = self.data.lock().unwrap();
        data.get(key).copied()
    }
    

    ---

    学習の確認

    この課題を通じて、以下を理解できたか確認してください:

  • [ ] スレッドの生成と管理
  • [ ] Send/Syncトレイトの意味
  • [ ] チャネルによるメッセージパッシング
  • [ ] Mutexによる共有状態の管理
  • [ ] Arcによる複数所有権
  • [ ] デッドロックの危険性
  • [ ] 並行処理のパターン

次の章では、非同期プログラミングについて学びます。async/await、Future、tokioを理解します。