zig-async - 解答

実装コード

スレッドベースの並行HTTPクライアント

const std = @import("std");

pub const AsyncClient = struct {
    allocator: std.mem.Allocator,
    timeout_ms: u64 = 30000,

    pub fn init(allocator: std.mem.Allocator) AsyncClient {
        return .{ .allocator = allocator };
    }

    pub fn setTimeout(self: *AsyncClient, ms: u64) void {
        self.timeout_ms = ms;
    }

    /// 単一URLをフェッチ
    pub fn fetch(self: *AsyncClient, url: []const u8) ![]u8 {
        const uri = try std.Uri.parse(url);

        var client = std.http.Client{ .allocator = self.allocator };
        defer client.deinit();

        var request = try client.request(.GET, uri, .{}, .{});
        defer request.deinit();

        try request.wait();

        var body = std.ArrayList(u8).init(self.allocator);
        try request.reader().readAllArrayList(&body, 1024 * 1024);

        return body.toOwnedSlice();
    }

    /// 複数URLを並行フェッチ
    pub fn fetchAll(self: *AsyncClient, urls: []const []const u8) ![][]u8 {
        const results = try self.allocator.alloc([]u8, urls.len);
        errdefer self.allocator.free(results);

        var threads = try self.allocator.alloc(std.Thread, urls.len);
        defer self.allocator.free(threads);

        // 各URLに対してスレッドを起動
        for (urls, 0..) |url, i| {
            const context = FetchContext{
                .client = self,
                .url = url,
                .result = &results[i],
            };
            threads[i] = try std.Thread.spawn(.{}, fetchWorker, .{context});
        }

        // 全スレッドの完了を待機
        for (threads) |thread| {
            thread.join();
        }

        return results;
    }

    const FetchContext = struct {
        client: *AsyncClient,
        url: []const u8,
        result: *[]u8,
    };

    fn fetchWorker(ctx: FetchContext) void {
        ctx.result.* = ctx.client.fetch(ctx.url) catch &[_]u8{};
    }
};

非同期ファイルI/O

pub const AsyncFileReader = struct {
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator) AsyncFileReader {
        return .{ .allocator = allocator };
    }

    /// 単一ファイルを読み込み
    pub fn readFile(self: *AsyncFileReader, path: []const u8) ![]u8 {
        const file = try std.fs.cwd().openFile(path, .{});
        defer file.close();

        return try file.readToEndAlloc(self.allocator, 10 * 1024 * 1024);
    }

    /// 複数ファイルを並行読み込み
    pub fn readFiles(self: *AsyncFileReader, paths: []const []const u8) ![][]u8 {
        const results = try self.allocator.alloc([]u8, paths.len);
        errdefer self.allocator.free(results);

        var threads = try self.allocator.alloc(std.Thread, paths.len);
        defer self.allocator.free(threads);

        for (paths, 0..) |path, i| {
            const context = ReadContext{
                .reader = self,
                .path = path,
                .result = &results[i],
            };
            threads[i] = try std.Thread.spawn(.{}, readWorker, .{context});
        }

        for (threads) |thread| {
            thread.join();
        }

        return results;
    }

    const ReadContext = struct {
        reader: *AsyncFileReader,
        path: []const u8,
        result: *[]u8,
    };

    fn readWorker(ctx: ReadContext) void {
        ctx.result.* = ctx.reader.readFile(ctx.path) catch &[_]u8{};
    }
};

イベントループ

pub const EventLoop = struct {
    timers: std.ArrayList(Timer),
    running: bool = true,
    allocator: std.mem.Allocator,

    const Timer = struct {
        callback: *const fn () void,
        fire_at: i64,
    };

    pub fn init(allocator: std.mem.Allocator) EventLoop {
        return .{
            .timers = std.ArrayList(Timer).init(allocator),
            .allocator = allocator,
        };
    }

    pub fn deinit(self: *EventLoop) void {
        self.timers.deinit();
    }

    pub fn setTimeout(self: *EventLoop, callback: *const fn () void, ms: u64) !void {
        const fire_at = std.time.milliTimestamp() + @as(i64, @intCast(ms));
        try self.timers.append(.{
            .callback = callback,
            .fire_at = fire_at,
        });
    }

    pub fn run(self: *EventLoop) void {
        while (self.running and self.timers.items.len > 0) {
            const now = std.time.milliTimestamp();

            // 期限切れのタイマーを実行
            var i: usize = 0;
            while (i < self.timers.items.len) {
                if (self.timers.items[i].fire_at <= now) {
                    const timer = self.timers.orderedRemove(i);
                    timer.callback();
                } else {
                    i += 1;
                }
            }

            // 短いスリープ
            std.time.sleep(1 * std.time.ns_per_ms);
        }
    }

    pub fn stop(self: *EventLoop) void {
        self.running = false;
    }
};

ボーナス: チャンネル実装

pub fn Channel(comptime T: type) type {
    return struct {
        const Self = @This();

        buffer: std.ArrayList(T),
        mutex: std.Thread.Mutex = .{},
        not_empty: std.Thread.Condition = .{},
        not_full: std.Thread.Condition = .{},
        capacity: usize,
        closed: bool = false,

        pub fn init(allocator: std.mem.Allocator, capacity: usize) Self {
            return .{
                .buffer = std.ArrayList(T).init(allocator),
                .capacity = capacity,
            };
        }

        pub fn deinit(self: *Self) void {
            self.buffer.deinit();
        }

        pub fn send(self: *Self, value: T) !void {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.buffer.items.len >= self.capacity) {
                if (self.closed) return error.ChannelClosed;
                self.not_full.wait(&self.mutex);
            }

            try self.buffer.append(value);
            self.not_empty.signal();
        }

        pub fn receive(self: *Self) !T {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.buffer.items.len == 0) {
                if (self.closed) return error.ChannelClosed;
                self.not_empty.wait(&self.mutex);
            }

            const value = self.buffer.orderedRemove(0);
            self.not_full.signal();
            return value;
        }

        pub fn close(self: *Self) void {
            self.mutex.lock();
            defer self.mutex.unlock();

            self.closed = true;
            self.not_empty.broadcast();
            self.not_full.broadcast();
        }
    };
}

テストコード

const testing = std.testing;

test "AsyncClient fetchAll" {
    // テスト用のモックサーバーが必要
    // 実際のテストではローカルサーバーを起動

    var client = AsyncClient.init(testing.allocator);

    // 並行性のテスト
    const start = std.time.milliTimestamp();
    // const results = try client.fetchAll(&urls);
    const elapsed = std.time.milliTimestamp() - start;

    // 並行処理なら合計時間より短い
    std.debug.print("Elapsed: {}ms\n", .{elapsed});
}

test "EventLoop timer" {
    var loop = EventLoop.init(testing.allocator);
    defer loop.deinit();

    var counter: usize = 0;

    try loop.setTimeout(struct {
        fn cb() void {
            // この中でcounterにアクセスするには工夫が必要
        }
    }.cb, 10);

    loop.run();
}

test "Channel send receive" {
    var channel = Channel(i32).init(testing.allocator, 10);
    defer channel.deinit();

    try channel.send(42);
    const value = try channel.receive();
    try testing.expectEqual(@as(i32, 42), value);
}