第16章: 非同期と並行処理

学習目標

この章を終えると、以下ができるようになります:

  • Zigのスレッドを作成し、並行処理を実装する
  • ミューテックスとアトミック操作でデータ競合を防ぐ
  • async/awaitの基本概念を理解する(実験的機能)
  • スレッドプールを使用した効率的な並行処理
  • イベントループの基本を理解する
  • 並行処理の基礎

    なぜ並行処理が必要か

    現代のコンピュータはマルチコアプロセッサを搭載しており、並行処理により以下が可能になります:

  • パフォーマンス向上: 複数のCPUコアを活用
  • レスポンシブ性: I/O待機中も他の処理を実行
  • スケーラビリティ: 負荷に応じて処理を分散
  • シングルスレッド:      マルチスレッド:
    ┌─────────┐           ┌─────────┐
    │ Task 1  │           │ Task 1  │
    ├─────────┤           ├─────────┤
    │ Task 2  │           │ Task 2  │ (並行実行)
    ├─────────┤           ├─────────┤
    │ Task 3  │           │ Task 3  │
    └─────────┘           └─────────┘
      時間 →                時間 →
    

    並行性 vs 並列性

    重要な違い:

  • 並行性(Concurrency): 複数のタスクを扱う能力
  • 並列性(Parallelism): 複数のタスクを同時に実行
  • // 並行性: 1つのコアで複数タスクを切り替え
    // 並列性: 複数のコアで同時実行
    

    Zigのスレッド

    スレッドの作成

    const std = @import("std");
    
    fn threadFunction(arg: *u32) void {
        std.debug.print("Thread started with arg: {}\n", .{arg.*});
    
        // スレッドの処理
        var i: u32 = 0;
        while (i < 5) : (i += 1) {
            std.debug.print("Thread working: {}\n", .{i});
            std.time.sleep(std.time.ns_per_ms * 100);
        }
    
        std.debug.print("Thread finished\n", .{});
    }
    
    pub fn basicThreadExample() !void {
        var value: u32 = 42;
    
        // スレッドを生成
        const thread = try std.Thread.spawn(.{}, threadFunction, .{&value});
    
        // メインスレッドの処理
        std.debug.print("Main thread working\n", .{});
    
        // スレッドの完了を待つ
        thread.join();
    
        std.debug.print("All threads completed\n", .{});
    }
    

    複数のスレッド

    const std = @import("std");
    
    fn worker(id: usize) void {
        std.debug.print("Worker {} started\n", .{id});
    
        var sum: usize = 0;
        var i: usize = 0;
        while (i < 1_000_000) : (i += 1) {
            sum += i;
        }
    
        std.debug.print("Worker {} finished with sum: {}\n", .{ id, sum });
    }
    
    pub fn multipleThreadsExample() !void {
        const num_threads = 4;
        var threads: [num_threads]std.Thread = undefined;
    
        // 全スレッドを起動
        for (&threads, 0..) |*thread, i| {
            thread.* = try std.Thread.spawn(.{}, worker, .{i});
        }
    
        // 全スレッドの完了を待つ
        for (threads) |thread| {
            thread.join();
        }
    
        std.debug.print("All workers completed\n", .{});
    }
    

    スレッド間のデータ共有

    スレッド間でデータを共有する場合、データ競合に注意が必要です:

    const std = @import("std");
    
    const SharedData = struct {
        counter: u32,
        mutex: std.Thread.Mutex,
    
        pub fn init() SharedData {
            return SharedData{
                .counter = 0,
                .mutex = std.Thread.Mutex{},
            };
        }
    
        pub fn increment(self: *SharedData) void {
            // ミューテックスでロック
            self.mutex.lock();
            defer self.mutex.unlock();
    
            self.counter += 1;
        }
    
        pub fn get(self: *SharedData) u32 {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            return self.counter;
        }
    };
    
    fn incrementWorker(shared: *SharedData) void {
        var i: u32 = 0;
        while (i < 10000) : (i += 1) {
            shared.increment();
        }
    }
    
    pub fn sharedDataExample() !void {
        var shared = SharedData.init();
    
        const num_threads = 4;
        var threads: [num_threads]std.Thread = undefined;
    
        for (&threads) |*thread| {
            thread.* = try std.Thread.spawn(.{}, incrementWorker, .{&shared});
        }
    
        for (threads) |thread| {
            thread.join();
        }
    
        std.debug.print("Final counter: {}\n", .{shared.get()});
        // 期待値: 40000 (4 threads × 10000)
    }
    

    同期プリミティブ

    ミューテックス(Mutex)

    排他制御を実現:

    const std = @import("std");
    
    pub const Counter = struct {
        value: i32,
        mutex: std.Thread.Mutex,
    
        pub fn init() Counter {
            return Counter{
                .value = 0,
                .mutex = std.Thread.Mutex{},
            };
        }
    
        pub fn increment(self: *Counter) void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            self.value += 1;
        }
    
        pub fn decrement(self: *Counter) void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            self.value -= 1;
        }
    
        pub fn get(self: *Counter) i32 {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            return self.value;
        }
    };
    

    読み書きロック(RwLock)

    読み込みは並行、書き込みは排他:

    const std = @import("std");
    
    pub const SharedResource = struct {
        data: [100]u8,
        rwlock: std.Thread.RwLock,
    
        pub fn init() SharedResource {
            return SharedResource{
                .data = [_]u8{0} ** 100,
                .rwlock = std.Thread.RwLock{},
            };
        }
    
        pub fn read(self: *SharedResource, index: usize) u8 {
            self.rwlock.lockShared();
            defer self.rwlock.unlockShared();
    
            return self.data[index];
        }
    
        pub fn write(self: *SharedResource, index: usize, value: u8) void {
            self.rwlock.lock();
            defer self.rwlock.unlock();
    
            self.data[index] = value;
        }
    };
    
    fn reader(resource: *SharedResource, id: usize) void {
        var i: usize = 0;
        while (i < 10) : (i += 1) {
            const value = resource.read(id % 100);
            std.debug.print("Reader {} read: {}\n", .{ id, value });
            std.time.sleep(std.time.ns_per_ms * 50);
        }
    }
    
    fn writer(resource: *SharedResource, id: usize) void {
        var i: usize = 0;
        while (i < 5) : (i += 1) {
            resource.write(id % 100, @intCast(id));
            std.debug.print("Writer {} wrote\n", .{id});
            std.time.sleep(std.time.ns_per_ms * 100);
        }
    }
    

    アトミック操作

    ロックフリーな同期:

    const std = @import("std");
    
    pub const AtomicCounter = struct {
        value: std.atomic.Value(u32),
    
        pub fn init() AtomicCounter {
            return AtomicCounter{
                .value = std.atomic.Value(u32).init(0),
            };
        }
    
        pub fn increment(self: *AtomicCounter) void {
            _ = self.value.fetchAdd(1, .monotonic);
        }
    
        pub fn get(self: *AtomicCounter) u32 {
            return self.value.load(.monotonic);
        }
    
        pub fn compareAndSwap(self: *AtomicCounter, expected: u32, desired: u32) bool {
            const result = self.value.cmpxchgWeak(expected, desired, .monotonic, .monotonic);
            return result == null;
        }
    };
    
    fn atomicWorker(counter: *AtomicCounter) void {
        var i: u32 = 0;
        while (i < 10000) : (i += 1) {
            counter.increment();
        }
    }
    
    pub fn atomicExample() !void {
        var counter = AtomicCounter.init();
    
        const num_threads = 4;
        var threads: [num_threads]std.Thread = undefined;
    
        for (&threads) |*thread| {
            thread.* = try std.Thread.spawn(.{}, atomicWorker, .{&counter});
        }
    
        for (threads) |thread| {
            thread.join();
        }
    
        std.debug.print("Atomic counter: {}\n", .{counter.get()});
    }
    

    スレッドプール

    基本的なスレッドプール

    const std = @import("std");
    
    pub const Task = struct {
        func: *const fn (data: *anyopaque) void,
        data: *anyopaque,
    };
    
    pub const ThreadPool = struct {
        threads: []std.Thread,
        queue: std.ArrayList(Task),
        mutex: std.Thread.Mutex,
        condition: std.Thread.Condition,
        shutdown: bool,
        allocator: std.mem.Allocator,
    
        pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
            var pool = ThreadPool{
                .threads = try allocator.alloc(std.Thread, num_threads),
                .queue = std.ArrayList(Task).init(allocator),
                .mutex = std.Thread.Mutex{},
                .condition = std.Thread.Condition{},
                .shutdown = false,
                .allocator = allocator,
            };
    
            for (pool.threads) |*thread| {
                thread.* = try std.Thread.spawn(.{}, workerThread, .{&pool});
            }
    
            return pool;
        }
    
        pub fn deinit(self: *ThreadPool) void {
            self.mutex.lock();
            self.shutdown = true;
            self.condition.broadcast();
            self.mutex.unlock();
    
            for (self.threads) |thread| {
                thread.join();
            }
    
            self.queue.deinit();
            self.allocator.free(self.threads);
        }
    
        pub fn submit(self: *ThreadPool, task: Task) !void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            try self.queue.append(task);
            self.condition.signal();
        }
    
        fn workerThread(self: *ThreadPool) void {
            while (true) {
                self.mutex.lock();
    
                while (self.queue.items.len == 0 and !self.shutdown) {
                    self.condition.wait(&self.mutex);
                }
    
                if (self.shutdown and self.queue.items.len == 0) {
                    self.mutex.unlock();
                    break;
                }
    
                const task = self.queue.orderedRemove(0);
                self.mutex.unlock();
    
                task.func(task.data);
            }
        }
    };
    
    // 使用例
    fn exampleTask(data: *anyopaque) void {
        const value = @as(*u32, @ptrCast(@alignCast(data)));
        std.debug.print("Processing task with value: {}\n", .{value.*});
        std.time.sleep(std.time.ns_per_ms * 100);
    }
    
    pub fn threadPoolExample() !void {
        const allocator = std.heap.page_allocator;
        var pool = try ThreadPool.init(allocator, 4);
        defer pool.deinit();
    
        var values = [_]u32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    
        for (&values) |*value| {
            try pool.submit(.{
                .func = exampleTask,
                .data = value,
            });
        }
    
        // タスク完了を待つ
        std.time.sleep(std.time.ns_per_s * 2);
    }
    

    非同期I/O

    標準ライブラリのスレッドプール

    const std = @import("std");
    
    fn computeTask(allocator: std.mem.Allocator, n: usize) !u64 {
        _ = allocator;
        var result: u64 = 0;
        var i: usize = 0;
        while (i < n) : (i += 1) {
            result += i;
        }
        return result;
    }
    
    pub fn stdThreadPoolExample() !void {
        const allocator = std.heap.page_allocator;
    
        // スレッドプールを作成
        var pool: std.Thread.Pool = undefined;
        try pool.init(.{ .allocator = allocator });
        defer pool.deinit();
    
        const WaitGroup = std.Thread.WaitGroup;
        var wg: WaitGroup = undefined;
        wg.reset();
    
        const results = try allocator.alloc(u64, 10);
        defer allocator.free(results);
    
        for (results, 0..) |*result, i| {
            wg.start();
            try pool.spawn(struct {
                fn run(r: *u64, index: usize, wait_group: *WaitGroup) void {
                    defer wait_group.finish();
                    r.* = blk: {
                        var sum: u64 = 0;
                        var j: usize = 0;
                        while (j < (index + 1) * 1000) : (j += 1) {
                            sum += j;
                        }
                        break :blk sum;
                    };
                }
            }.run, .{ result, i, &wg });
        }
    
        pool.waitAndWork(&wg);
    
        for (results, 0..) |result, i| {
            std.debug.print("Task {} result: {}\n", .{ i, result });
        }
    }
    

    Async/Await(実験的)

    非同期関数の基本

    注意: Zigのasync/awaitは現在実験的機能です。

    const std = @import("std");
    
    fn asyncComputation(n: u32) u32 {
        var result: u32 = 0;
        var i: u32 = 0;
        while (i < n) : (i += 1) {
            result += i;
        }
        return result;
    }
    
    fn asyncTask() void {
        std.debug.print("Async task started\n", .{});
        const result = asyncComputation(1000);
        std.debug.print("Async task result: {}\n", .{result});
    }
    
    // 注意: async/await の構文は将来変更される可能性があります
    

    フレームの概念

    // 非同期関数はフレームを持つ
    fn asyncFunction() void {
        suspend {
            // 実行を中断
        }
        // 再開後の処理
    }
    

    並行パターン

    Producer-Consumer パターン

    const std = @import("std");
    
    const Queue = struct {
        items: std.ArrayList(u32),
        mutex: std.Thread.Mutex,
        not_empty: std.Thread.Condition,
        not_full: std.Thread.Condition,
        capacity: usize,
    
        pub fn init(allocator: std.mem.Allocator, capacity: usize) Queue {
            return Queue{
                .items = std.ArrayList(u32).init(allocator),
                .mutex = std.Thread.Mutex{},
                .not_empty = std.Thread.Condition{},
                .not_full = std.Thread.Condition{},
                .capacity = capacity,
            };
        }
    
        pub fn deinit(self: *Queue) void {
            self.items.deinit();
        }
    
        pub fn enqueue(self: *Queue, item: u32) void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            while (self.items.items.len >= self.capacity) {
                self.not_full.wait(&self.mutex);
            }
    
            self.items.append(item) catch unreachable;
            self.not_empty.signal();
        }
    
        pub fn dequeue(self: *Queue) u32 {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            while (self.items.items.len == 0) {
                self.not_empty.wait(&self.mutex);
            }
    
            const item = self.items.orderedRemove(0);
            self.not_full.signal();
            return item;
        }
    };
    
    fn producer(queue: *Queue, id: usize) void {
        var i: u32 = 0;
        while (i < 10) : (i += 1) {
            const item = @as(u32, @intCast(id * 100 + i));
            queue.enqueue(item);
            std.debug.print("Producer {} produced: {}\n", .{ id, item });
            std.time.sleep(std.time.ns_per_ms * 100);
        }
    }
    
    fn consumer(queue: *Queue, id: usize) void {
        var i: u32 = 0;
        while (i < 10) : (i += 1) {
            const item = queue.dequeue();
            std.debug.print("Consumer {} consumed: {}\n", .{ id, item });
            std.time.sleep(std.time.ns_per_ms * 150);
        }
    }
    

    ワーカープールパターン

    const std = @import("std");
    
    const WorkItem = struct {
        id: u32,
        data: u32,
    };
    
    const WorkQueue = struct {
        items: std.ArrayList(WorkItem),
        mutex: std.Thread.Mutex,
        condition: std.Thread.Condition,
        shutdown: bool,
    
        pub fn init(allocator: std.mem.Allocator) WorkQueue {
            return WorkQueue{
                .items = std.ArrayList(WorkItem).init(allocator),
                .mutex = std.Thread.Mutex{},
                .condition = std.Thread.Condition{},
                .shutdown = false,
            };
        }
    
        pub fn deinit(self: *WorkQueue) void {
            self.items.deinit();
        }
    
        pub fn push(self: *WorkQueue, item: WorkItem) !void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            try self.items.append(item);
            self.condition.signal();
        }
    
        pub fn pop(self: *WorkQueue) ?WorkItem {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            while (self.items.items.len == 0 and !self.shutdown) {
                self.condition.wait(&self.mutex);
            }
    
            if (self.shutdown and self.items.items.len == 0) {
                return null;
            }
    
            return self.items.orderedRemove(0);
        }
    
        pub fn close(self: *WorkQueue) void {
            self.mutex.lock();
            defer self.mutex.unlock();
    
            self.shutdown = true;
            self.condition.broadcast();
        }
    };
    
    fn worker(queue: *WorkQueue, id: usize) void {
        while (queue.pop()) |item| {
            std.debug.print("Worker {} processing item {}: {}\n", .{ id, item.id, item.data });
            std.time.sleep(std.time.ns_per_ms * 100);
        }
        std.debug.print("Worker {} shutting down\n", .{id});
    }
    

    パフォーマンス最適化

    キャッシュラインの考慮

    const std = @import("std");
    
    // 偽共有(False Sharing)を避ける
    pub const PaddedCounter = struct {
        value: std.atomic.Value(u64) align(64), // キャッシュラインサイズ
        padding: [56]u8 = [_]u8{0} ** 56,
    
        pub fn init() PaddedCounter {
            return PaddedCounter{
                .value = std.atomic.Value(u64).init(0),
            };
        }
    
        pub fn increment(self: *PaddedCounter) void {
            _ = self.value.fetchAdd(1, .monotonic);
        }
    };
    

    ロックフリーデータ構造

    const std = @import("std");
    
    pub const LockFreeStack = struct {
        head: std.atomic.Value(?*Node),
    
        pub const Node = struct {
            data: u32,
            next: ?*Node,
        };
    
        pub fn init() LockFreeStack {
            return LockFreeStack{
                .head = std.atomic.Value(?*Node).init(null),
            };
        }
    
        pub fn push(self: *LockFreeStack, node: *Node) void {
            while (true) {
                const old_head = self.head.load(.acquire);
                node.next = old_head;
    
                if (self.head.cmpxchgWeak(old_head, node, .release, .acquire) == null) {
                    break;
                }
            }
        }
    
        pub fn pop(self: *LockFreeStack) ?*Node {
            while (true) {
                const old_head = self.head.load(.acquire) orelse return null;
    
                if (self.head.cmpxchgWeak(old_head, old_head.next, .release, .acquire) == null) {
                    return old_head;
                }
            }
        }
    };
    

    まとめ

    この章では、Zigの並行処理と非同期プログラミングについて学びました:

  • スレッド: 基本的なスレッド生成と管理
  • 同期プリミティブ: ミューテックス、RwLock、アトミック操作
  • スレッドプール: 効率的なタスク処理
  • 並行パターン: Producer-Consumer、ワーカープール
  • パフォーマンス: キャッシュライン、ロックフリーデータ構造
  • 次の章では、SIMD(Single Instruction Multiple Data)による並列計算について学びます。

    参考文献

  • Zig Standard Library - Thread: https://ziglang.org/documentation/master/std/#std.Thread
  • Concurrent Programming in Zig: https://ziglearn.org/chapter-4/