zig-async - 解説

実装の詳細

スレッドベースの並行処理

Zigの現在のバージョン(0.11+)では、async/awaitが一時的に無効化されているため、スレッドを使用します。

// スレッドの起動
const thread = try std.Thread.spawn(.{}, workerFn, .{arg});

// スレッドの完了を待機
thread.join();

// デタッチ(バックグラウンド実行)
thread.detach();

コンテキストの受け渡し

const Context = struct {
    input: []const u8,
    result: *[]u8,
    allocator: std.mem.Allocator,
};

fn worker(ctx: Context) void {
    // ctx.resultに結果を書き込む
    ctx.result.* = processData(ctx.input, ctx.allocator) catch &[_]u8{};
}

同期プリミティブ

// Mutex(相互排他ロック)
var mutex = std.Thread.Mutex{};
mutex.lock();
defer mutex.unlock();
// クリティカルセクション

// 条件変数
var condition = std.Thread.Condition{};
condition.wait(&mutex);    // 待機
condition.signal();        // 1つのスレッドを起床
condition.broadcast();     // 全スレッドを起床

よくある間違い

1. データ競合

// 間違い: 共有データへの非同期アクセス
var counter: usize = 0;

fn worker() void {
    counter += 1;  // データ競合!
}

// 正しい: Mutex で保護
var mutex = std.Thread.Mutex{};

fn workerSafe() void {
    mutex.lock();
    defer mutex.unlock();
    counter += 1;
}

// または: Atomic 操作
var atomic_counter = std.atomic.Value(usize).init(0);

fn workerAtomic() void {
    _ = atomic_counter.fetchAdd(1, .seq_cst);
}

2. デッドロック

// 間違い: ロックの順序が不定
fn funcA() void {
    mutex1.lock();
    mutex2.lock();  // funcBが逆順でロックするとデッドロック
    // ...
}

fn funcB() void {
    mutex2.lock();
    mutex1.lock();
    // ...
}

// 正しい: 常に同じ順序でロック
fn funcASafe() void {
    mutex1.lock();
    defer mutex1.unlock();
    mutex2.lock();
    defer mutex2.unlock();
    // ...
}

fn funcBSafe() void {
    mutex1.lock();  // 同じ順序
    defer mutex1.unlock();
    mutex2.lock();
    defer mutex2.unlock();
    // ...
}

3. リソースリーク

// 間違い: スレッドを join/detach しない
fn leaky() !void {
    const thread = try std.Thread.spawn(.{}, worker, .{});
    // thread は放置...
}

// 正しい: 必ず join または detach
fn correct() !void {
    const thread = try std.Thread.spawn(.{}, worker, .{});
    defer thread.join();
    // ...
}

パフォーマンス考慮事項

スレッド数の選択

// CPUコア数を取得
const cpu_count = try std.Thread.getCpuCount();

// I/Oバウンドタスク: コア数 × 2〜4
const io_threads = cpu_count * 2;

// CPUバウンドタスク: コア数
const cpu_threads = cpu_count;

スレッドプール

pub const ThreadPool = struct {
    workers: []std.Thread,
    queue: std.atomic.Queue(Task),
    shutdown: std.atomic.Value(bool),

    pub fn init(allocator: std.mem.Allocator, size: usize) !ThreadPool {
        var pool = ThreadPool{
            .workers = try allocator.alloc(std.Thread, size),
            .queue = std.atomic.Queue(Task).init(),
            .shutdown = std.atomic.Value(bool).init(false),
        };

        for (pool.workers) |*worker| {
            worker.* = try std.Thread.spawn(.{}, runWorker, .{&pool});
        }

        return pool;
    }

    fn runWorker(pool: *ThreadPool) void {
        while (!pool.shutdown.load(.seq_cst)) {
            if (pool.queue.pop()) |task| {
                task.execute();
            } else {
                std.time.sleep(1 * std.time.ns_per_ms);
            }
        }
    }
};

イベントループの詳細

効率的なタイマー管理

// 優先度キューを使用(次に発火するタイマーを効率的に取得)
const TimerQueue = std.PriorityQueue(Timer, void, compareTimers);

fn compareTimers(_: void, a: Timer, b: Timer) std.math.Order {
    return std.math.order(a.fire_at, b.fire_at);
}

I/O多重化

// epoll/kqueue を使用したI/O多重化
const Poller = struct {
    fd: std.posix.fd_t,

    pub fn init() !Poller {
        return .{
            .fd = try std.posix.epoll_create1(0),
        };
    }

    pub fn add(self: *Poller, fd: std.posix.fd_t, events: u32) !void {
        var ev = std.linux.epoll_event{
            .events = events,
            .data = .{ .fd = fd },
        };
        try std.posix.epoll_ctl(self.fd, .ADD, fd, &ev);
    }

    pub fn wait(self: *Poller, events: []std.linux.epoll_event, timeout: i32) !usize {
        return std.posix.epoll_wait(self.fd, events, timeout);
    }
};

発展トピック

ワークスティーリング

┌─────────┐  ┌─────────┐  ┌─────────┐
│ Worker1 │  │ Worker2 │  │ Worker3 │
│ [T][T]  │  │ [T][T]  │  │ []      │ ← 仕事がない
└─────────┘  └─────────┘  └────┬────┘
                              │
                              │ スティール
                              ▼
                         ┌─────────┐
                         │ Worker1 │
                         │ [T]     │ ← タスクを奪う
                         └─────────┘

構造化並行性

// 親タスクが完了する前に子タスクが完了することを保証
pub fn withScope(comptime f: fn (*Scope) void) void {
    var scope = Scope.init();
    defer scope.waitAll();

    f(&scope);
}

// 使用例
withScope(struct {
    fn run(scope: *Scope) void {
        scope.spawn(task1, .{});
        scope.spawn(task2, .{});
        // スコープ終了時に両方のタスクの完了を待機
    }
}.run);