zig-async - 解説
実装の詳細
スレッドベースの並行処理
Zigの現在のバージョン(0.11+)では、async/awaitが一時的に無効化されているため、スレッドを使用します。
// スレッドの起動
const thread = try std.Thread.spawn(.{}, workerFn, .{arg});
// スレッドの完了を待機
thread.join();
// デタッチ(バックグラウンド実行)
thread.detach();
コンテキストの受け渡し
const Context = struct {
input: []const u8,
result: *[]u8,
allocator: std.mem.Allocator,
};
fn worker(ctx: Context) void {
// ctx.resultに結果を書き込む
ctx.result.* = processData(ctx.input, ctx.allocator) catch &[_]u8{};
}
同期プリミティブ
// Mutex(相互排他ロック)
var mutex = std.Thread.Mutex{};
mutex.lock();
defer mutex.unlock();
// クリティカルセクション
// 条件変数
var condition = std.Thread.Condition{};
condition.wait(&mutex); // 待機
condition.signal(); // 1つのスレッドを起床
condition.broadcast(); // 全スレッドを起床
よくある間違い
1. データ競合
// 間違い: 共有データへの非同期アクセス
var counter: usize = 0;
fn worker() void {
counter += 1; // データ競合!
}
// 正しい: Mutex で保護
var mutex = std.Thread.Mutex{};
fn workerSafe() void {
mutex.lock();
defer mutex.unlock();
counter += 1;
}
// または: Atomic 操作
var atomic_counter = std.atomic.Value(usize).init(0);
fn workerAtomic() void {
_ = atomic_counter.fetchAdd(1, .seq_cst);
}
2. デッドロック
// 間違い: ロックの順序が不定
fn funcA() void {
mutex1.lock();
mutex2.lock(); // funcBが逆順でロックするとデッドロック
// ...
}
fn funcB() void {
mutex2.lock();
mutex1.lock();
// ...
}
// 正しい: 常に同じ順序でロック
fn funcASafe() void {
mutex1.lock();
defer mutex1.unlock();
mutex2.lock();
defer mutex2.unlock();
// ...
}
fn funcBSafe() void {
mutex1.lock(); // 同じ順序
defer mutex1.unlock();
mutex2.lock();
defer mutex2.unlock();
// ...
}
3. リソースリーク
// 間違い: スレッドを join/detach しない
fn leaky() !void {
const thread = try std.Thread.spawn(.{}, worker, .{});
// thread は放置...
}
// 正しい: 必ず join または detach
fn correct() !void {
const thread = try std.Thread.spawn(.{}, worker, .{});
defer thread.join();
// ...
}
パフォーマンス考慮事項
スレッド数の選択
// CPUコア数を取得
const cpu_count = try std.Thread.getCpuCount();
// I/Oバウンドタスク: コア数 × 2〜4
const io_threads = cpu_count * 2;
// CPUバウンドタスク: コア数
const cpu_threads = cpu_count;
スレッドプール
pub const ThreadPool = struct {
workers: []std.Thread,
queue: std.atomic.Queue(Task),
shutdown: std.atomic.Value(bool),
pub fn init(allocator: std.mem.Allocator, size: usize) !ThreadPool {
var pool = ThreadPool{
.workers = try allocator.alloc(std.Thread, size),
.queue = std.atomic.Queue(Task).init(),
.shutdown = std.atomic.Value(bool).init(false),
};
for (pool.workers) |*worker| {
worker.* = try std.Thread.spawn(.{}, runWorker, .{&pool});
}
return pool;
}
fn runWorker(pool: *ThreadPool) void {
while (!pool.shutdown.load(.seq_cst)) {
if (pool.queue.pop()) |task| {
task.execute();
} else {
std.time.sleep(1 * std.time.ns_per_ms);
}
}
}
};
イベントループの詳細
効率的なタイマー管理
// 優先度キューを使用(次に発火するタイマーを効率的に取得)
const TimerQueue = std.PriorityQueue(Timer, void, compareTimers);
fn compareTimers(_: void, a: Timer, b: Timer) std.math.Order {
return std.math.order(a.fire_at, b.fire_at);
}
I/O多重化
// epoll/kqueue を使用したI/O多重化
const Poller = struct {
fd: std.posix.fd_t,
pub fn init() !Poller {
return .{
.fd = try std.posix.epoll_create1(0),
};
}
pub fn add(self: *Poller, fd: std.posix.fd_t, events: u32) !void {
var ev = std.linux.epoll_event{
.events = events,
.data = .{ .fd = fd },
};
try std.posix.epoll_ctl(self.fd, .ADD, fd, &ev);
}
pub fn wait(self: *Poller, events: []std.linux.epoll_event, timeout: i32) !usize {
return std.posix.epoll_wait(self.fd, events, timeout);
}
};
発展トピック
ワークスティーリング
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker1 │ │ Worker2 │ │ Worker3 │
│ [T][T] │ │ [T][T] │ │ [] │ ← 仕事がない
└─────────┘ └─────────┘ └────┬────┘
│
│ スティール
▼
┌─────────┐
│ Worker1 │
│ [T] │ ← タスクを奪う
└─────────┘
構造化並行性
// 親タスクが完了する前に子タスクが完了することを保証
pub fn withScope(comptime f: fn (*Scope) void) void {
var scope = Scope.init();
defer scope.waitAll();
f(&scope);
}
// 使用例
withScope(struct {
fn run(scope: *Scope) void {
scope.spawn(task1, .{});
scope.spawn(task2, .{});
// スコープ終了時に両方のタスクの完了を待機
}
}.run);