課題16: 並行処理 (Concurrency)
マンダトリー要件
問題1: 並列データ処理(30点)
以下の仕様に従って並列データ処理を実装しなさい。
use std::thread;
use std::sync::{Arc, Mutex};
// 大きな配列を並列に処理して合計を計算
fn parallel_sum(data: Vec<i32>, num_threads: usize) -> i32 {
// TODO: 実装 (30点)
// データを分割して各スレッドで処理
// 結果をマージして返す
}
fn main() {
let data: Vec<i32> = (1..=1000).collect();
// シングルスレッド
let expected: i32 = data.iter().sum();
// マルチスレッド
let result = parallel_sum(data.clone(), 4);
assert_eq!(result, expected);
println!("Problem 1 passed! Sum = {}", result);
}
評価基準:
- データの分割: 10点
- スレッド生成と実行: 10点
- 結果のマージ: 10点
---
問題2: プロデューサー・コンシューマー(30点)
チャネルを使ってプロデューサー・コンシューマーパターンを実装しなさい。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
struct Message {
id: usize,
content: String,
}
fn producer(tx: mpsc::Sender<Message>, id: usize, count: usize) {
// TODO: 実装 (10点)
// count個のメッセージを送信
// 各メッセージは100msの間隔で送信
}
fn consumer(rx: mpsc::Receiver<Message>) -> Vec<Message> {
// TODO: 実装 (10点)
// すべてのメッセージを受信して返す
}
fn main() {
let (tx, rx) = mpsc::channel();
// 複数のプロデューサーを生成 (10点)
// TODO: 3つのプロデューサースレッドを起動
// 各プロデューサーは5つのメッセージを送信
// コンシューマー
let consumer_handle = thread::spawn(move || {
consumer(rx)
});
let messages = consumer_handle.join().unwrap();
println!("Received {} messages", messages.len());
assert_eq!(messages.len(), 15); // 3 producers * 5 messages
println!("Problem 2 passed!");
}
評価基準:
- プロデューサーの実装: 10点
- コンシューマーの実装: 10点
- 複数プロデューサーの管理: 10点
---
問題3: スレッドセーフなキャッシュ(20点)
Arcを使ってスレッドセーフなキャッシュを実装しなさい。
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
struct Cache {
data: Arc<Mutex<HashMap<String, i32>>>,
}
impl Cache {
fn new() -> Self {
// TODO: 実装 (5点)
}
fn get(&self, key: &str) -> Option<i32> {
// TODO: 実装 (5点)
}
fn set(&self, key: String, value: i32) {
// TODO: 実装 (5点)
}
fn clone_cache(&self) -> Cache {
// TODO: 実装 (5点)
}
}
fn main() {
let cache = Cache::new();
let mut handles = vec![];
// 複数のスレッドからキャッシュにアクセス
for i in 0..10 {
let cache = cache.clone_cache();
let handle = thread::spawn(move || {
cache.set(format!("key{}", i), i as i32);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
// 結果を確認
for i in 0..10 {
assert_eq!(cache.get(&format!("key{}", i)), Some(i as i32));
}
println!("Problem 3 passed!");
}
評価基準:
newの実装: 5点getの実装: 5点setの実装: 5点clone_cacheの実装: 5点
---
ボーナス課題
ボーナス1: ワーカープール(10点)
シンプルなワーカープールを実装しなさい。
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
// TODO: 実装
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
// TODO: 実装
}
}
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
// TODO: 実装
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..10 {
pool.execute(move || {
println!("Job {} executing", i);
thread::sleep(std::time::Duration::from_millis(100));
});
}
thread::sleep(std::time::Duration::from_secs(2));
println!("Bonus 1 passed!");
}
評価基準:
ThreadPool::newの実装: 3点Worker::newの実装: 4点executeの実装: 3点
---
ボーナス2: 並列クイックソート(10点)
並列版クイックソートを実装しなさい。
use std::thread;
fn parallel_quicksort(mut data: Vec<i32>) -> Vec<i32> {
// TODO: 実装
// 基準値より大きい/小さいデータを別スレッドでソート
// 深さ制限を設けてスレッド数を制御すること
}
fn main() {
let mut data: Vec<i32> = (1..=100).rev().collect();
let sorted = parallel_quicksort(data);
let expected: Vec<i32> = (1..=100).collect();
assert_eq!(sorted, expected);
println!("Bonus 2 passed!");
}
評価基準:
- 基本的なクイックソート: 5点
- 並列化の実装: 5点
---
ボーナス3: レート リミッター(10点)
一定時間内のリクエスト数を制限する仕組みを実装しなさい。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
struct RateLimiter {
max_requests: usize,
window: Duration,
requests: Arc<Mutex<Vec<Instant>>>,
}
impl RateLimiter {
fn new(max_requests: usize, window: Duration) -> Self {
// TODO: 実装
}
fn allow(&self) -> bool {
// TODO: 実装
// 時間窓内のリクエスト数をチェック
// 許可ならtrue、制限超過ならfalse
}
fn clone_limiter(&self) -> RateLimiter {
// TODO: 実装
}
}
fn main() {
let limiter = RateLimiter::new(5, Duration::from_secs(1));
let mut handles = vec![];
for i in 0..10 {
let limiter = limiter.clone_limiter();
let handle = thread::spawn(move || {
if limiter.allow() {
println!("Request {} allowed", i);
} else {
println!("Request {} rate limited", i);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Bonus 3 passed!");
}
評価基準:
- 基本的な実装: 5点
- 古いリクエストの削除: 5点
---
評価基準
マンダトリー部分(80点)
| 項目 | 配点 | 評価ポイント |
|---|---|---|
| 問題1:並列データ処理 | 30点 | スレッド分割と結果マージ |
| 問題2:メッセージパッシング | 30点 | チャネルの正しい使用 |
| 問題3:スレッドセーフキャッシュ | 20点 | Mutex/Arcの理解 |
ボーナス部分(20点)
| 項目 | 配点 | 評価ポイント |
|---|---|---|
| ボーナス1:ワーカープール | 10点 | 実用的なパターン |
| ボーナス2:並列ソート | 10点 | 再帰的な並列化 |
| ボーナス3:レートリミッター | 10点 | 時間制御の実装 |
注: ボーナスは最大20点まで加算されます。
---
提出方法
ファイル構成
rust-foundations-16/
├── src/
│ ├── problem1.rs
│ ├── problem2.rs
│ ├── problem3.rs
│ ├── bonus1.rs # オプション
│ ├── bonus2.rs # オプション
│ └── bonus3.rs # オプション
├── Cargo.toml
└── README.md
テスト
cargo test --release
提出期限
---
ヒント
問題1のヒント
let chunk_size = data.len() / num_threads;
let chunks: Vec<Vec<i32>> = data
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect();
let results = Arc::new(Mutex::new(vec![]));
問題2のヒント
// プロデューサー
for i in 0..count {
let msg = Message {
id: producer_id * 1000 + i,
content: format!("Message from producer {}", producer_id),
};
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
問題3のヒント
fn get(&self, key: &str) -> Option<i32> {
let data = self.data.lock().unwrap();
data.get(key).copied()
}
---
学習の確認
この課題を通じて、以下を理解できたか確認してください:
次の章では、非同期プログラミングについて学びます。async/await、Future、tokioを理解します。