課題16: 非同期と並行処理
マンダトリー要件 (80点)
Part 1: 基本的なスレッド (20点)
並行処理の基礎を実装してください:
ファイル名: part1_threads.zig
const std = @import("std");
// TODO: 以下の関数を実装してください
// 1. N個の素数を見つけるワーカー関数
fn findPrimes(start: u32, count: u32, results: []u32) void {
// ここに実装
// start から count 個の素数を見つけて results に格納
}
// 2. 並列でフィボナッチ数を計算
fn fibonacci(n: u32) u64 {
// ここに実装
}
// 3. 複数のスレッドでフィボナッチ数列を計算し、結果を集約
pub fn parallelFibonacci(allocator: std.mem.Allocator, count: u32) ![]u64 {
// ここに実装
// count 個のフィボナッチ数を並列計算
}
// 4. ワーカースレッドで大量の計算を実行
const ComputeTask = struct {
id: u32,
data: []const u32,
result: u64,
};
fn computeWorker(task: *ComputeTask) void {
// ここに実装
// data の全要素の合計を result に格納
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
// フィボナッチの並列計算
const fib_results = try parallelFibonacci(allocator, 10);
defer allocator.free(fib_results);
std.debug.print("Fibonacci results:\n", .{});
for (fib_results, 0..) |result, i| {
std.debug.print("F({}) = {}\n", .{ i, result });
}
// 計算タスクの並列実行
const num_tasks = 4;
var tasks: [num_tasks]ComputeTask = undefined;
var threads: [num_tasks]std.Thread = undefined;
for (&tasks, 0..) |*task, i| {
const data = try allocator.alloc(u32, 1000);
for (data, 0..) |*val, j| {
val.* = @intCast(i * 1000 + j);
}
task.* = ComputeTask{
.id = @intCast(i),
.data = data,
.result = 0,
};
threads[i] = try std.Thread.spawn(.{}, computeWorker, .{task});
}
for (threads) |thread| {
thread.join();
}
for (tasks) |task| {
std.debug.print("Task {} result: {}\n", .{ task.id, task.result });
allocator.free(task.data);
}
}
要件:
- スレッドの生成と終了が正しく行われる
- 計算結果が正確である
- リソースリークがない
Part 2: 同期とデータ共有 (20点)
ミューテックスとアトミック操作を使用してください:
ファイル名: part2_synchronization.zig
const std = @import("std");
// TODO: 以下の構造体を実装してください
// 1. スレッドセーフなカウンター
pub const SafeCounter = struct {
value: i64,
mutex: std.Thread.Mutex,
pub fn init() SafeCounter {
// ここに実装
}
pub fn increment(self: *SafeCounter) void {
// ここに実装
}
pub fn decrement(self: *SafeCounter) void {
// ここに実装
}
pub fn get(self: *SafeCounter) i64 {
// ここに実装
}
};
// 2. スレッドセーフな配列
pub const SafeArray = struct {
items: std.ArrayList(u32),
mutex: std.Thread.Mutex,
pub fn init(allocator: std.mem.Allocator) SafeArray {
// ここに実装
}
pub fn deinit(self: *SafeArray) void {
// ここに実装
}
pub fn append(self: *SafeArray, item: u32) !void {
// ここに実装
}
pub fn get(self: *SafeArray, index: usize) ?u32 {
// ここに実装
}
pub fn len(self: *SafeArray) usize {
// ここに実装
}
};
// 3. アトミックカウンター
pub const AtomicCounter = struct {
value: std.atomic.Value(u64),
pub fn init() AtomicCounter {
// ここに実装
}
pub fn increment(self: *AtomicCounter) void {
// ここに実装
}
pub fn get(self: *AtomicCounter) u64 {
// ここに実装
}
};
// 4. 読み書きロック付きバッファ
pub const SharedBuffer = struct {
data: [1024]u8,
rwlock: std.Thread.RwLock,
pub fn init() SharedBuffer {
// ここに実装
}
pub fn read(self: *SharedBuffer, offset: usize, len: usize) []const u8 {
// ここに実装
}
pub fn write(self: *SharedBuffer, offset: usize, data: []const u8) void {
// ここに実装
}
};
fn counterTest(counter: *SafeCounter, id: usize) void {
var i: usize = 0;
while (i < 10000) : (i += 1) {
if (id % 2 == 0) {
counter.increment();
} else {
counter.decrement();
}
}
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
// カウンターのテスト
var counter = SafeCounter.init();
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
for (&threads, 0..) |*thread, i| {
thread.* = try std.Thread.spawn(.{}, counterTest, .{ &counter, i });
}
for (threads) |thread| {
thread.join();
}
std.debug.print("Final counter value: {}\n", .{counter.get()});
// 配列のテスト
var array = SafeArray.init(allocator);
defer array.deinit();
var i: u32 = 0;
while (i < 100) : (i += 1) {
try array.append(i);
}
std.debug.print("Array length: {}\n", .{array.len()});
// アトミックカウンターのテスト
var atomic_counter = AtomicCounter.init();
// TODO: テストを実装
}
要件:
- データ競合が発生しない
- デッドロックが発生しない
- 期待通りの結果が得られる
Part 3: Producer-Consumerパターン (20点)
キューを使用したProducer-Consumerを実装してください:
ファイル名: part3_producer_consumer.zig
const std = @import("std");
// TODO: 以下の構造体を実装してください
pub const BoundedQueue = struct {
items: std.ArrayList(u32),
capacity: usize,
mutex: std.Thread.Mutex,
not_empty: std.Thread.Condition,
not_full: std.Thread.Condition,
shutdown: bool,
pub fn init(allocator: std.mem.Allocator, capacity: usize) BoundedQueue {
// ここに実装
}
pub fn deinit(self: *BoundedQueue) void {
// ここに実装
}
pub fn enqueue(self: *BoundedQueue, item: u32) !void {
// ここに実装
// キューが満杯の場合は待機
}
pub fn dequeue(self: *BoundedQueue) ?u32 {
// ここに実装
// キューが空の場合は待機
// shutdown が true で空なら null を返す
}
pub fn close(self: *BoundedQueue) void {
// ここに実装
// shutdown フラグを立てて全スレッドを起こす
}
};
fn producer(queue: *BoundedQueue, id: usize, count: u32) void {
// TODO: 実装
// count 個のアイテムをキューに入れる
var i: u32 = 0;
while (i < count) : (i += 1) {
const item = @as(u32, @intCast(id * 10000 + i));
// enqueue を呼び出す
std.debug.print("Producer {} produced: {}\n", .{ id, item });
}
}
fn consumer(queue: *BoundedQueue, id: usize) void {
// TODO: 実装
// キューからアイテムを取り出して処理
while (queue.dequeue()) |item| {
std.debug.print("Consumer {} consumed: {}\n", .{ id, item });
std.time.sleep(std.time.ns_per_ms * 10);
}
std.debug.print("Consumer {} shutting down\n", .{id});
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
var queue = BoundedQueue.init(allocator, 10);
defer queue.deinit();
const num_producers = 2;
const num_consumers = 3;
var producer_threads: [num_producers]std.Thread = undefined;
var consumer_threads: [num_consumers]std.Thread = undefined;
// コンシューマーを起動
for (&consumer_threads, 0..) |*thread, i| {
thread.* = try std.Thread.spawn(.{}, consumer, .{ &queue, i });
}
// プロデューサーを起動
for (&producer_threads, 0..) |*thread, i| {
thread.* = try std.Thread.spawn(.{}, producer, .{ &queue, i, 50 });
}
// プロデューサーの完了を待つ
for (producer_threads) |thread| {
thread.join();
}
// キューを閉じる
std.time.sleep(std.time.ns_per_s);
queue.close();
// コンシューマーの完了を待つ
for (consumer_threads) |thread| {
thread.join();
}
}
要件:
- キューの容量制限が正しく機能
- 条件変数を使用した待機
- シャットダウン処理が適切
Part 4: スレッドプール (20点)
汎用的なスレッドプールを実装してください:
ファイル名: part4_thread_pool.zig
const std = @import("std");
// TODO: 以下の構造体を実装してください
pub const Task = struct {
func: *const fn (data: *anyopaque) void,
data: *anyopaque,
};
pub const ThreadPool = struct {
threads: []std.Thread,
queue: std.ArrayList(Task),
mutex: std.Thread.Mutex,
condition: std.Thread.Condition,
shutdown: bool,
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
// ここに実装
// ワーカースレッドを起動
}
pub fn deinit(self: *ThreadPool) void {
// ここに実装
// シャットダウンして全スレッドを終了
}
pub fn submit(self: *ThreadPool, task: Task) !void {
// ここに実装
// タスクをキューに追加
}
fn workerThread(self: *ThreadPool) void {
// ここに実装
// キューからタスクを取り出して実行
}
};
// テスト用のタスク
const TaskData = struct {
id: u32,
result: *u64,
};
fn computeTask(data: *anyopaque) void {
const task_data = @as(*TaskData, @ptrCast(@alignCast(data)));
var sum: u64 = 0;
var i: u64 = 0;
while (i < 1_000_000) : (i += 1) {
sum += i;
}
task_data.result.* = sum;
std.debug.print("Task {} completed with result: {}\n", .{ task_data.id, sum });
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
var pool = try ThreadPool.init(allocator, 4);
defer pool.deinit();
const num_tasks = 10;
var task_data: [num_tasks]TaskData = undefined;
var results: [num_tasks]u64 = undefined;
for (&task_data, 0..) |*data, i| {
data.* = TaskData{
.id = @intCast(i),
.result = &results[i],
};
try pool.submit(.{
.func = computeTask,
.data = data,
});
}
// タスク完了を待つ
std.time.sleep(std.time.ns_per_s * 2);
std.debug.print("\nAll tasks completed\n", .{});
for (results, 0..) |result, i| {
std.debug.print("Task {} final result: {}\n", .{ i, result });
}
}
要件:
- 指定された数のワーカースレッド
- タスクキューの管理
- グレースフルシャットダウン
ボーナス課題 (20点)
Bonus 1: 並列マージソート (10点)
並列処理を使用したマージソートを実装してください:
ファイル名: bonus1_parallel_sort.zig
const std = @import("std");
pub fn parallelMergeSort(allocator: std.mem.Allocator, arr: []u32, threshold: usize) !void {
// TODO: 実装
// threshold より小さい場合は通常のソート
// それ以上の場合は並列処理
}
fn merge(allocator: std.mem.Allocator, left: []u32, right: []u32, result: []u32) !void {
// TODO: 実装
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
// 大きな配列を生成
const size = 1_000_000;
var arr = try allocator.alloc(u32, size);
defer allocator.free(arr);
var rng = std.rand.DefaultPrng.init(42);
for (arr) |*val| {
val.* = rng.random().int(u32);
}
const start = std.time.milliTimestamp();
try parallelMergeSort(allocator, arr, 10000);
const end = std.time.milliTimestamp();
// ソート結果の検証
var i: usize = 1;
while (i < arr.len) : (i += 1) {
if (arr[i - 1] > arr[i]) {
std.debug.print("Sort failed at index {}\n", .{i});
return;
}
}
std.debug.print("Sort completed in {} ms\n", .{end - start});
}
Bonus 2: ワークスティーリング (10点)
ワークスティーリングアルゴリズムを実装してください:
ファイル名: bonus2_work_stealing.zig
const std = @import("std");
pub const WorkStealingQueue = struct {
items: std.ArrayList(Task),
mutex: std.Thread.Mutex,
pub fn init(allocator: std.mem.Allocator) WorkStealingQueue {
// TODO: 実装
}
pub fn deinit(self: *WorkStealingQueue) void {
// TODO: 実装
}
pub fn push(self: *WorkStealingQueue, task: Task) !void {
// TODO: 実装
}
pub fn pop(self: *WorkStealingQueue) ?Task {
// TODO: 実装
}
pub fn steal(self: *WorkStealingQueue) ?Task {
// TODO: 実装
// キューの先頭から盗む
}
};
pub const Task = struct {
id: u32,
work_amount: u32,
};
pub const WorkStealingPool = struct {
queues: []WorkStealingQueue,
threads: []std.Thread,
shutdown: std.atomic.Value(bool),
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, num_threads: usize) !WorkStealingPool {
// TODO: 実装
}
pub fn deinit(self: *WorkStealingPool) void {
// TODO: 実装
}
pub fn submit(self: *WorkStealingPool, worker_id: usize, task: Task) !void {
// TODO: 実装
}
fn workerThread(self: *WorkStealingPool, worker_id: usize) void {
// TODO: 実装
// 自分のキューから取得、なければ他のワーカーから盗む
}
};
pub fn main() !void {
const allocator = std.heap.page_allocator;
var pool = try WorkStealingPool.init(allocator, 4);
defer pool.deinit();
// タスクを投入
var i: u32 = 0;
while (i < 100) : (i += 1) {
try pool.submit(i % 4, Task{
.id = i,
.work_amount = 1000,
});
}
std.time.sleep(std.time.ns_per_s * 3);
}
評価基準
| 項目 | 配点 |
|---|---|
| Part 1: 基本的なスレッド | 20点 |
| Part 2: 同期とデータ共有 | 20点 |
| Part 3: Producer-Consumer | 20点 |
| Part 4: スレッドプール | 20点 |
| **マンダトリー合計** | **80点** |
| Bonus 1: 並列マージソート | 10点 |
| Bonus 2: ワークスティーリング | 10点 |
| **ボーナス合計** | **20点** |
合格基準
提出方法
exercise16/
├── part1_threads.zig
├── part2_synchronization.zig
├── part3_producer_consumer.zig
├── part4_thread_pool.zig
├── bonus1_parallel_sort.zig
└── bonus2_work_stealing.zig
テスト実行
zig build-exe part1_threads.zig
./part1_threads
zig build-exe part2_synchronization.zig
./part2_synchronization
zig build-exe part3_producer_consumer.zig
./part3_producer_consumer
zig build-exe part4_thread_pool.zig
./part4_thread_pool
ヒント
const thread = try std.Thread.spawn(.{}, function, .{arg});
thread.join();
- ミューテックスの使用:
mutex.lock();
defer mutex.unlock();
- 条件変数:
condition.wait(&mutex);
condition.signal();
condition.broadcast();
- アトミック操作:
_ = atomic_value.fetchAdd(1, .monotonic);
参考資料
- Zig Thread Documentation: https://ziglang.org/documentation/master/std/#std.Thread
- Zig Atomics: https://ziglang.org/documentation/master/std/#std.atomic