課題16: 非同期と並行処理

マンダトリー要件 (80点)

Part 1: 基本的なスレッド (20点)

並行処理の基礎を実装してください:

ファイル名: part1_threads.zig

const std = @import("std");

// TODO: 以下の関数を実装してください

// 1. N個の素数を見つけるワーカー関数
fn findPrimes(start: u32, count: u32, results: []u32) void {
    // ここに実装
    // start から count 個の素数を見つけて results に格納
}

// 2. 並列でフィボナッチ数を計算
fn fibonacci(n: u32) u64 {
    // ここに実装
}

// 3. 複数のスレッドでフィボナッチ数列を計算し、結果を集約
pub fn parallelFibonacci(allocator: std.mem.Allocator, count: u32) ![]u64 {
    // ここに実装
    // count 個のフィボナッチ数を並列計算
}

// 4. ワーカースレッドで大量の計算を実行
const ComputeTask = struct {
    id: u32,
    data: []const u32,
    result: u64,
};

fn computeWorker(task: *ComputeTask) void {
    // ここに実装
    // data の全要素の合計を result に格納
}

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    // フィボナッチの並列計算
    const fib_results = try parallelFibonacci(allocator, 10);
    defer allocator.free(fib_results);

    std.debug.print("Fibonacci results:\n", .{});
    for (fib_results, 0..) |result, i| {
        std.debug.print("F({}) = {}\n", .{ i, result });
    }

    // 計算タスクの並列実行
    const num_tasks = 4;
    var tasks: [num_tasks]ComputeTask = undefined;
    var threads: [num_tasks]std.Thread = undefined;

    for (&tasks, 0..) |*task, i| {
        const data = try allocator.alloc(u32, 1000);
        for (data, 0..) |*val, j| {
            val.* = @intCast(i * 1000 + j);
        }

        task.* = ComputeTask{
            .id = @intCast(i),
            .data = data,
            .result = 0,
        };

        threads[i] = try std.Thread.spawn(.{}, computeWorker, .{task});
    }

    for (threads) |thread| {
        thread.join();
    }

    for (tasks) |task| {
        std.debug.print("Task {} result: {}\n", .{ task.id, task.result });
        allocator.free(task.data);
    }
}

要件:

  • スレッドの生成と終了が正しく行われる
  • 計算結果が正確である
  • リソースリークがない

Part 2: 同期とデータ共有 (20点)

ミューテックスとアトミック操作を使用してください:

ファイル名: part2_synchronization.zig

const std = @import("std");

// TODO: 以下の構造体を実装してください

// 1. スレッドセーフなカウンター
pub const SafeCounter = struct {
    value: i64,
    mutex: std.Thread.Mutex,

    pub fn init() SafeCounter {
        // ここに実装
    }

    pub fn increment(self: *SafeCounter) void {
        // ここに実装
    }

    pub fn decrement(self: *SafeCounter) void {
        // ここに実装
    }

    pub fn get(self: *SafeCounter) i64 {
        // ここに実装
    }
};

// 2. スレッドセーフな配列
pub const SafeArray = struct {
    items: std.ArrayList(u32),
    mutex: std.Thread.Mutex,

    pub fn init(allocator: std.mem.Allocator) SafeArray {
        // ここに実装
    }

    pub fn deinit(self: *SafeArray) void {
        // ここに実装
    }

    pub fn append(self: *SafeArray, item: u32) !void {
        // ここに実装
    }

    pub fn get(self: *SafeArray, index: usize) ?u32 {
        // ここに実装
    }

    pub fn len(self: *SafeArray) usize {
        // ここに実装
    }
};

// 3. アトミックカウンター
pub const AtomicCounter = struct {
    value: std.atomic.Value(u64),

    pub fn init() AtomicCounter {
        // ここに実装
    }

    pub fn increment(self: *AtomicCounter) void {
        // ここに実装
    }

    pub fn get(self: *AtomicCounter) u64 {
        // ここに実装
    }
};

// 4. 読み書きロック付きバッファ
pub const SharedBuffer = struct {
    data: [1024]u8,
    rwlock: std.Thread.RwLock,

    pub fn init() SharedBuffer {
        // ここに実装
    }

    pub fn read(self: *SharedBuffer, offset: usize, len: usize) []const u8 {
        // ここに実装
    }

    pub fn write(self: *SharedBuffer, offset: usize, data: []const u8) void {
        // ここに実装
    }
};

fn counterTest(counter: *SafeCounter, id: usize) void {
    var i: usize = 0;
    while (i < 10000) : (i += 1) {
        if (id % 2 == 0) {
            counter.increment();
        } else {
            counter.decrement();
        }
    }
}

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    // カウンターのテスト
    var counter = SafeCounter.init();
    const num_threads = 4;
    var threads: [num_threads]std.Thread = undefined;

    for (&threads, 0..) |*thread, i| {
        thread.* = try std.Thread.spawn(.{}, counterTest, .{ &counter, i });
    }

    for (threads) |thread| {
        thread.join();
    }

    std.debug.print("Final counter value: {}\n", .{counter.get()});

    // 配列のテスト
    var array = SafeArray.init(allocator);
    defer array.deinit();

    var i: u32 = 0;
    while (i < 100) : (i += 1) {
        try array.append(i);
    }

    std.debug.print("Array length: {}\n", .{array.len()});

    // アトミックカウンターのテスト
    var atomic_counter = AtomicCounter.init();
    // TODO: テストを実装
}

要件:

  • データ競合が発生しない
  • デッドロックが発生しない
  • 期待通りの結果が得られる

Part 3: Producer-Consumerパターン (20点)

キューを使用したProducer-Consumerを実装してください:

ファイル名: part3_producer_consumer.zig

const std = @import("std");

// TODO: 以下の構造体を実装してください

pub const BoundedQueue = struct {
    items: std.ArrayList(u32),
    capacity: usize,
    mutex: std.Thread.Mutex,
    not_empty: std.Thread.Condition,
    not_full: std.Thread.Condition,
    shutdown: bool,

    pub fn init(allocator: std.mem.Allocator, capacity: usize) BoundedQueue {
        // ここに実装
    }

    pub fn deinit(self: *BoundedQueue) void {
        // ここに実装
    }

    pub fn enqueue(self: *BoundedQueue, item: u32) !void {
        // ここに実装
        // キューが満杯の場合は待機
    }

    pub fn dequeue(self: *BoundedQueue) ?u32 {
        // ここに実装
        // キューが空の場合は待機
        // shutdown が true で空なら null を返す
    }

    pub fn close(self: *BoundedQueue) void {
        // ここに実装
        // shutdown フラグを立てて全スレッドを起こす
    }
};

fn producer(queue: *BoundedQueue, id: usize, count: u32) void {
    // TODO: 実装
    // count 個のアイテムをキューに入れる
    var i: u32 = 0;
    while (i < count) : (i += 1) {
        const item = @as(u32, @intCast(id * 10000 + i));
        // enqueue を呼び出す
        std.debug.print("Producer {} produced: {}\n", .{ id, item });
    }
}

fn consumer(queue: *BoundedQueue, id: usize) void {
    // TODO: 実装
    // キューからアイテムを取り出して処理
    while (queue.dequeue()) |item| {
        std.debug.print("Consumer {} consumed: {}\n", .{ id, item });
        std.time.sleep(std.time.ns_per_ms * 10);
    }
    std.debug.print("Consumer {} shutting down\n", .{id});
}

pub fn main() !void {
    const allocator = std.heap.page_allocator;
    var queue = BoundedQueue.init(allocator, 10);
    defer queue.deinit();

    const num_producers = 2;
    const num_consumers = 3;

    var producer_threads: [num_producers]std.Thread = undefined;
    var consumer_threads: [num_consumers]std.Thread = undefined;

    // コンシューマーを起動
    for (&consumer_threads, 0..) |*thread, i| {
        thread.* = try std.Thread.spawn(.{}, consumer, .{ &queue, i });
    }

    // プロデューサーを起動
    for (&producer_threads, 0..) |*thread, i| {
        thread.* = try std.Thread.spawn(.{}, producer, .{ &queue, i, 50 });
    }

    // プロデューサーの完了を待つ
    for (producer_threads) |thread| {
        thread.join();
    }

    // キューを閉じる
    std.time.sleep(std.time.ns_per_s);
    queue.close();

    // コンシューマーの完了を待つ
    for (consumer_threads) |thread| {
        thread.join();
    }
}

要件:

  • キューの容量制限が正しく機能
  • 条件変数を使用した待機
  • シャットダウン処理が適切

Part 4: スレッドプール (20点)

汎用的なスレッドプールを実装してください:

ファイル名: part4_thread_pool.zig

const std = @import("std");

// TODO: 以下の構造体を実装してください

pub const Task = struct {
    func: *const fn (data: *anyopaque) void,
    data: *anyopaque,
};

pub const ThreadPool = struct {
    threads: []std.Thread,
    queue: std.ArrayList(Task),
    mutex: std.Thread.Mutex,
    condition: std.Thread.Condition,
    shutdown: bool,
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
        // ここに実装
        // ワーカースレッドを起動
    }

    pub fn deinit(self: *ThreadPool) void {
        // ここに実装
        // シャットダウンして全スレッドを終了
    }

    pub fn submit(self: *ThreadPool, task: Task) !void {
        // ここに実装
        // タスクをキューに追加
    }

    fn workerThread(self: *ThreadPool) void {
        // ここに実装
        // キューからタスクを取り出して実行
    }
};

// テスト用のタスク
const TaskData = struct {
    id: u32,
    result: *u64,
};

fn computeTask(data: *anyopaque) void {
    const task_data = @as(*TaskData, @ptrCast(@alignCast(data)));

    var sum: u64 = 0;
    var i: u64 = 0;
    while (i < 1_000_000) : (i += 1) {
        sum += i;
    }

    task_data.result.* = sum;
    std.debug.print("Task {} completed with result: {}\n", .{ task_data.id, sum });
}

pub fn main() !void {
    const allocator = std.heap.page_allocator;

    var pool = try ThreadPool.init(allocator, 4);
    defer pool.deinit();

    const num_tasks = 10;
    var task_data: [num_tasks]TaskData = undefined;
    var results: [num_tasks]u64 = undefined;

    for (&task_data, 0..) |*data, i| {
        data.* = TaskData{
            .id = @intCast(i),
            .result = &results[i],
        };

        try pool.submit(.{
            .func = computeTask,
            .data = data,
        });
    }

    // タスク完了を待つ
    std.time.sleep(std.time.ns_per_s * 2);

    std.debug.print("\nAll tasks completed\n", .{});
    for (results, 0..) |result, i| {
        std.debug.print("Task {} final result: {}\n", .{ i, result });
    }
}

要件:

  • 指定された数のワーカースレッド
  • タスクキューの管理
  • グレースフルシャットダウン
  • ボーナス課題 (20点)

    Bonus 1: 並列マージソート (10点)

    並列処理を使用したマージソートを実装してください:

    ファイル名: bonus1_parallel_sort.zig

    const std = @import("std");
    
    pub fn parallelMergeSort(allocator: std.mem.Allocator, arr: []u32, threshold: usize) !void {
        // TODO: 実装
        // threshold より小さい場合は通常のソート
        // それ以上の場合は並列処理
    }
    
    fn merge(allocator: std.mem.Allocator, left: []u32, right: []u32, result: []u32) !void {
        // TODO: 実装
    }
    
    pub fn main() !void {
        const allocator = std.heap.page_allocator;
    
        // 大きな配列を生成
        const size = 1_000_000;
        var arr = try allocator.alloc(u32, size);
        defer allocator.free(arr);
    
        var rng = std.rand.DefaultPrng.init(42);
        for (arr) |*val| {
            val.* = rng.random().int(u32);
        }
    
        const start = std.time.milliTimestamp();
        try parallelMergeSort(allocator, arr, 10000);
        const end = std.time.milliTimestamp();
    
        // ソート結果の検証
        var i: usize = 1;
        while (i < arr.len) : (i += 1) {
            if (arr[i - 1] > arr[i]) {
                std.debug.print("Sort failed at index {}\n", .{i});
                return;
            }
        }
    
        std.debug.print("Sort completed in {} ms\n", .{end - start});
    }
    

    Bonus 2: ワークスティーリング (10点)

    ワークスティーリングアルゴリズムを実装してください:

    ファイル名: bonus2_work_stealing.zig

    const std = @import("std");
    
    pub const WorkStealingQueue = struct {
        items: std.ArrayList(Task),
        mutex: std.Thread.Mutex,
    
        pub fn init(allocator: std.mem.Allocator) WorkStealingQueue {
            // TODO: 実装
        }
    
        pub fn deinit(self: *WorkStealingQueue) void {
            // TODO: 実装
        }
    
        pub fn push(self: *WorkStealingQueue, task: Task) !void {
            // TODO: 実装
        }
    
        pub fn pop(self: *WorkStealingQueue) ?Task {
            // TODO: 実装
        }
    
        pub fn steal(self: *WorkStealingQueue) ?Task {
            // TODO: 実装
            // キューの先頭から盗む
        }
    };
    
    pub const Task = struct {
        id: u32,
        work_amount: u32,
    };
    
    pub const WorkStealingPool = struct {
        queues: []WorkStealingQueue,
        threads: []std.Thread,
        shutdown: std.atomic.Value(bool),
        allocator: std.mem.Allocator,
    
        pub fn init(allocator: std.mem.Allocator, num_threads: usize) !WorkStealingPool {
            // TODO: 実装
        }
    
        pub fn deinit(self: *WorkStealingPool) void {
            // TODO: 実装
        }
    
        pub fn submit(self: *WorkStealingPool, worker_id: usize, task: Task) !void {
            // TODO: 実装
        }
    
        fn workerThread(self: *WorkStealingPool, worker_id: usize) void {
            // TODO: 実装
            // 自分のキューから取得、なければ他のワーカーから盗む
        }
    };
    
    pub fn main() !void {
        const allocator = std.heap.page_allocator;
    
        var pool = try WorkStealingPool.init(allocator, 4);
        defer pool.deinit();
    
        // タスクを投入
        var i: u32 = 0;
        while (i < 100) : (i += 1) {
            try pool.submit(i % 4, Task{
                .id = i,
                .work_amount = 1000,
            });
        }
    
        std.time.sleep(std.time.ns_per_s * 3);
    }
    

    評価基準

    項目 配点
    Part 1: 基本的なスレッド 20点
    Part 2: 同期とデータ共有 20点
    Part 3: Producer-Consumer 20点
    Part 4: スレッドプール 20点
    **マンダトリー合計** **80点**
    Bonus 1: 並列マージソート 10点
    Bonus 2: ワークスティーリング 10点
    **ボーナス合計** **20点**

    合格基準

  • マンダトリー: 64点以上で合格(80%)
  • ボーナス: 追加評価
  • 提出方法

    exercise16/
    ├── part1_threads.zig
    ├── part2_synchronization.zig
    ├── part3_producer_consumer.zig
    ├── part4_thread_pool.zig
    ├── bonus1_parallel_sort.zig
    └── bonus2_work_stealing.zig
    

    テスト実行

    zig build-exe part1_threads.zig
    ./part1_threads
    
    zig build-exe part2_synchronization.zig
    ./part2_synchronization
    
    zig build-exe part3_producer_consumer.zig
    ./part3_producer_consumer
    
    zig build-exe part4_thread_pool.zig
    ./part4_thread_pool
    

    ヒント

  • スレッドの生成:
const thread = try std.Thread.spawn(.{}, function, .{arg});
thread.join();

  • ミューテックスの使用:
mutex.lock();
defer mutex.unlock();

  • 条件変数:
condition.wait(&mutex);
condition.signal();
condition.broadcast();

  • アトミック操作:
_ = atomic_value.fetchAdd(1, .monotonic);

参考資料

  • Zig Thread Documentation: https://ziglang.org/documentation/master/std/#std.Thread
  • Zig Atomics: https://ziglang.org/documentation/master/std/#std.atomic