zig-async - 解答
実装コード
スレッドベースの並行HTTPクライアント
const std = @import("std");
pub const AsyncClient = struct {
allocator: std.mem.Allocator,
timeout_ms: u64 = 30000,
pub fn init(allocator: std.mem.Allocator) AsyncClient {
return .{ .allocator = allocator };
}
pub fn setTimeout(self: *AsyncClient, ms: u64) void {
self.timeout_ms = ms;
}
/// 単一URLをフェッチ
pub fn fetch(self: *AsyncClient, url: []const u8) ![]u8 {
const uri = try std.Uri.parse(url);
var client = std.http.Client{ .allocator = self.allocator };
defer client.deinit();
var request = try client.request(.GET, uri, .{}, .{});
defer request.deinit();
try request.wait();
var body = std.ArrayList(u8).init(self.allocator);
try request.reader().readAllArrayList(&body, 1024 * 1024);
return body.toOwnedSlice();
}
/// 複数URLを並行フェッチ
pub fn fetchAll(self: *AsyncClient, urls: []const []const u8) ![][]u8 {
const results = try self.allocator.alloc([]u8, urls.len);
errdefer self.allocator.free(results);
var threads = try self.allocator.alloc(std.Thread, urls.len);
defer self.allocator.free(threads);
// 各URLに対してスレッドを起動
for (urls, 0..) |url, i| {
const context = FetchContext{
.client = self,
.url = url,
.result = &results[i],
};
threads[i] = try std.Thread.spawn(.{}, fetchWorker, .{context});
}
// 全スレッドの完了を待機
for (threads) |thread| {
thread.join();
}
return results;
}
const FetchContext = struct {
client: *AsyncClient,
url: []const u8,
result: *[]u8,
};
fn fetchWorker(ctx: FetchContext) void {
ctx.result.* = ctx.client.fetch(ctx.url) catch &[_]u8{};
}
};
非同期ファイルI/O
pub const AsyncFileReader = struct {
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator) AsyncFileReader {
return .{ .allocator = allocator };
}
/// 単一ファイルを読み込み
pub fn readFile(self: *AsyncFileReader, path: []const u8) ![]u8 {
const file = try std.fs.cwd().openFile(path, .{});
defer file.close();
return try file.readToEndAlloc(self.allocator, 10 * 1024 * 1024);
}
/// 複数ファイルを並行読み込み
pub fn readFiles(self: *AsyncFileReader, paths: []const []const u8) ![][]u8 {
const results = try self.allocator.alloc([]u8, paths.len);
errdefer self.allocator.free(results);
var threads = try self.allocator.alloc(std.Thread, paths.len);
defer self.allocator.free(threads);
for (paths, 0..) |path, i| {
const context = ReadContext{
.reader = self,
.path = path,
.result = &results[i],
};
threads[i] = try std.Thread.spawn(.{}, readWorker, .{context});
}
for (threads) |thread| {
thread.join();
}
return results;
}
const ReadContext = struct {
reader: *AsyncFileReader,
path: []const u8,
result: *[]u8,
};
fn readWorker(ctx: ReadContext) void {
ctx.result.* = ctx.reader.readFile(ctx.path) catch &[_]u8{};
}
};
イベントループ
pub const EventLoop = struct {
timers: std.ArrayList(Timer),
running: bool = true,
allocator: std.mem.Allocator,
const Timer = struct {
callback: *const fn () void,
fire_at: i64,
};
pub fn init(allocator: std.mem.Allocator) EventLoop {
return .{
.timers = std.ArrayList(Timer).init(allocator),
.allocator = allocator,
};
}
pub fn deinit(self: *EventLoop) void {
self.timers.deinit();
}
pub fn setTimeout(self: *EventLoop, callback: *const fn () void, ms: u64) !void {
const fire_at = std.time.milliTimestamp() + @as(i64, @intCast(ms));
try self.timers.append(.{
.callback = callback,
.fire_at = fire_at,
});
}
pub fn run(self: *EventLoop) void {
while (self.running and self.timers.items.len > 0) {
const now = std.time.milliTimestamp();
// 期限切れのタイマーを実行
var i: usize = 0;
while (i < self.timers.items.len) {
if (self.timers.items[i].fire_at <= now) {
const timer = self.timers.orderedRemove(i);
timer.callback();
} else {
i += 1;
}
}
// 短いスリープ
std.time.sleep(1 * std.time.ns_per_ms);
}
}
pub fn stop(self: *EventLoop) void {
self.running = false;
}
};
ボーナス: チャンネル実装
pub fn Channel(comptime T: type) type {
return struct {
const Self = @This();
buffer: std.ArrayList(T),
mutex: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
capacity: usize,
closed: bool = false,
pub fn init(allocator: std.mem.Allocator, capacity: usize) Self {
return .{
.buffer = std.ArrayList(T).init(allocator),
.capacity = capacity,
};
}
pub fn deinit(self: *Self) void {
self.buffer.deinit();
}
pub fn send(self: *Self, value: T) !void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.buffer.items.len >= self.capacity) {
if (self.closed) return error.ChannelClosed;
self.not_full.wait(&self.mutex);
}
try self.buffer.append(value);
self.not_empty.signal();
}
pub fn receive(self: *Self) !T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.buffer.items.len == 0) {
if (self.closed) return error.ChannelClosed;
self.not_empty.wait(&self.mutex);
}
const value = self.buffer.orderedRemove(0);
self.not_full.signal();
return value;
}
pub fn close(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.closed = true;
self.not_empty.broadcast();
self.not_full.broadcast();
}
};
}
テストコード
const testing = std.testing;
test "AsyncClient fetchAll" {
// テスト用のモックサーバーが必要
// 実際のテストではローカルサーバーを起動
var client = AsyncClient.init(testing.allocator);
// 並行性のテスト
const start = std.time.milliTimestamp();
// const results = try client.fetchAll(&urls);
const elapsed = std.time.milliTimestamp() - start;
// 並行処理なら合計時間より短い
std.debug.print("Elapsed: {}ms\n", .{elapsed});
}
test "EventLoop timer" {
var loop = EventLoop.init(testing.allocator);
defer loop.deinit();
var counter: usize = 0;
try loop.setTimeout(struct {
fn cb() void {
// この中でcounterにアクセスするには工夫が必要
}
}.cb, 10);
loop.run();
}
test "Channel send receive" {
var channel = Channel(i32).init(testing.allocator, 10);
defer channel.deinit();
try channel.send(42);
const value = try channel.receive();
try testing.expectEqual(@as(i32, 42), value);
}