第16章: 非同期と並行処理
学習目標
この章を終えると、以下ができるようになります:
- Zigのスレッドを作成し、並行処理を実装する
- ミューテックスとアトミック操作でデータ競合を防ぐ
- async/awaitの基本概念を理解する(実験的機能)
- スレッドプールを使用した効率的な並行処理
- イベントループの基本を理解する
- パフォーマンス向上: 複数のCPUコアを活用
- レスポンシブ性: I/O待機中も他の処理を実行
- スケーラビリティ: 負荷に応じて処理を分散
並行処理の基礎
なぜ並行処理が必要か
現代のコンピュータはマルチコアプロセッサを搭載しており、並行処理により以下が可能になります:
シングルスレッド: マルチスレッド:
┌─────────┐ ┌─────────┐
│ Task 1 │ │ Task 1 │
├─────────┤ ├─────────┤
│ Task 2 │ │ Task 2 │ (並行実行)
├─────────┤ ├─────────┤
│ Task 3 │ │ Task 3 │
└─────────┘ └─────────┘
時間 → 時間 →
並行性 vs 並列性
重要な違い:
// 並行性: 1つのコアで複数タスクを切り替え
// 並列性: 複数のコアで同時実行
Zigのスレッド
スレッドの作成
const std = @import("std");
fn threadFunction(arg: *u32) void {
std.debug.print("Thread started with arg: {}\n", .{arg.*});
// スレッドの処理
var i: u32 = 0;
while (i < 5) : (i += 1) {
std.debug.print("Thread working: {}\n", .{i});
std.time.sleep(std.time.ns_per_ms * 100);
}
std.debug.print("Thread finished\n", .{});
}
pub fn basicThreadExample() !void {
var value: u32 = 42;
// スレッドを生成
const thread = try std.Thread.spawn(.{}, threadFunction, .{&value});
// メインスレッドの処理
std.debug.print("Main thread working\n", .{});
// スレッドの完了を待つ
thread.join();
std.debug.print("All threads completed\n", .{});
}
複数のスレッド
const std = @import("std");
fn worker(id: usize) void {
std.debug.print("Worker {} started\n", .{id});
var sum: usize = 0;
var i: usize = 0;
while (i < 1_000_000) : (i += 1) {
sum += i;
}
std.debug.print("Worker {} finished with sum: {}\n", .{ id, sum });
}
pub fn multipleThreadsExample() !void {
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
// 全スレッドを起動
for (&threads, 0..) |*thread, i| {
thread.* = try std.Thread.spawn(.{}, worker, .{i});
}
// 全スレッドの完了を待つ
for (threads) |thread| {
thread.join();
}
std.debug.print("All workers completed\n", .{});
}
スレッド間のデータ共有
スレッド間でデータを共有する場合、データ競合に注意が必要です:
const std = @import("std");
const SharedData = struct {
counter: u32,
mutex: std.Thread.Mutex,
pub fn init() SharedData {
return SharedData{
.counter = 0,
.mutex = std.Thread.Mutex{},
};
}
pub fn increment(self: *SharedData) void {
// ミューテックスでロック
self.mutex.lock();
defer self.mutex.unlock();
self.counter += 1;
}
pub fn get(self: *SharedData) u32 {
self.mutex.lock();
defer self.mutex.unlock();
return self.counter;
}
};
fn incrementWorker(shared: *SharedData) void {
var i: u32 = 0;
while (i < 10000) : (i += 1) {
shared.increment();
}
}
pub fn sharedDataExample() !void {
var shared = SharedData.init();
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
for (&threads) |*thread| {
thread.* = try std.Thread.spawn(.{}, incrementWorker, .{&shared});
}
for (threads) |thread| {
thread.join();
}
std.debug.print("Final counter: {}\n", .{shared.get()});
// 期待値: 40000 (4 threads × 10000)
}
同期プリミティブ
ミューテックス(Mutex)
排他制御を実現:
const std = @import("std");
pub const Counter = struct {
value: i32,
mutex: std.Thread.Mutex,
pub fn init() Counter {
return Counter{
.value = 0,
.mutex = std.Thread.Mutex{},
};
}
pub fn increment(self: *Counter) void {
self.mutex.lock();
defer self.mutex.unlock();
self.value += 1;
}
pub fn decrement(self: *Counter) void {
self.mutex.lock();
defer self.mutex.unlock();
self.value -= 1;
}
pub fn get(self: *Counter) i32 {
self.mutex.lock();
defer self.mutex.unlock();
return self.value;
}
};
読み書きロック(RwLock)
読み込みは並行、書き込みは排他:
const std = @import("std");
pub const SharedResource = struct {
data: [100]u8,
rwlock: std.Thread.RwLock,
pub fn init() SharedResource {
return SharedResource{
.data = [_]u8{0} ** 100,
.rwlock = std.Thread.RwLock{},
};
}
pub fn read(self: *SharedResource, index: usize) u8 {
self.rwlock.lockShared();
defer self.rwlock.unlockShared();
return self.data[index];
}
pub fn write(self: *SharedResource, index: usize, value: u8) void {
self.rwlock.lock();
defer self.rwlock.unlock();
self.data[index] = value;
}
};
fn reader(resource: *SharedResource, id: usize) void {
var i: usize = 0;
while (i < 10) : (i += 1) {
const value = resource.read(id % 100);
std.debug.print("Reader {} read: {}\n", .{ id, value });
std.time.sleep(std.time.ns_per_ms * 50);
}
}
fn writer(resource: *SharedResource, id: usize) void {
var i: usize = 0;
while (i < 5) : (i += 1) {
resource.write(id % 100, @intCast(id));
std.debug.print("Writer {} wrote\n", .{id});
std.time.sleep(std.time.ns_per_ms * 100);
}
}
アトミック操作
ロックフリーな同期:
const std = @import("std");
pub const AtomicCounter = struct {
value: std.atomic.Value(u32),
pub fn init() AtomicCounter {
return AtomicCounter{
.value = std.atomic.Value(u32).init(0),
};
}
pub fn increment(self: *AtomicCounter) void {
_ = self.value.fetchAdd(1, .monotonic);
}
pub fn get(self: *AtomicCounter) u32 {
return self.value.load(.monotonic);
}
pub fn compareAndSwap(self: *AtomicCounter, expected: u32, desired: u32) bool {
const result = self.value.cmpxchgWeak(expected, desired, .monotonic, .monotonic);
return result == null;
}
};
fn atomicWorker(counter: *AtomicCounter) void {
var i: u32 = 0;
while (i < 10000) : (i += 1) {
counter.increment();
}
}
pub fn atomicExample() !void {
var counter = AtomicCounter.init();
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
for (&threads) |*thread| {
thread.* = try std.Thread.spawn(.{}, atomicWorker, .{&counter});
}
for (threads) |thread| {
thread.join();
}
std.debug.print("Atomic counter: {}\n", .{counter.get()});
}
スレッドプール
基本的なスレッドプール
const std = @import("std");
pub const Task = struct {
func: *const fn (data: *anyopaque) void,
data: *anyopaque,
};
pub const ThreadPool = struct {
threads: []std.Thread,
queue: std.ArrayList(Task),
mutex: std.Thread.Mutex,
condition: std.Thread.Condition,
shutdown: bool,
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
var pool = ThreadPool{
.threads = try allocator.alloc(std.Thread, num_threads),
.queue = std.ArrayList(Task).init(allocator),
.mutex = std.Thread.Mutex{},
.condition = std.Thread.Condition{},
.shutdown = false,
.allocator = allocator,
};
for (pool.threads) |*thread| {
thread.* = try std.Thread.spawn(.{}, workerThread, .{&pool});
}
return pool;
}
pub fn deinit(self: *ThreadPool) void {
self.mutex.lock();
self.shutdown = true;
self.condition.broadcast();
self.mutex.unlock();
for (self.threads) |thread| {
thread.join();
}
self.queue.deinit();
self.allocator.free(self.threads);
}
pub fn submit(self: *ThreadPool, task: Task) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.queue.append(task);
self.condition.signal();
}
fn workerThread(self: *ThreadPool) void {
while (true) {
self.mutex.lock();
while (self.queue.items.len == 0 and !self.shutdown) {
self.condition.wait(&self.mutex);
}
if (self.shutdown and self.queue.items.len == 0) {
self.mutex.unlock();
break;
}
const task = self.queue.orderedRemove(0);
self.mutex.unlock();
task.func(task.data);
}
}
};
// 使用例
fn exampleTask(data: *anyopaque) void {
const value = @as(*u32, @ptrCast(@alignCast(data)));
std.debug.print("Processing task with value: {}\n", .{value.*});
std.time.sleep(std.time.ns_per_ms * 100);
}
pub fn threadPoolExample() !void {
const allocator = std.heap.page_allocator;
var pool = try ThreadPool.init(allocator, 4);
defer pool.deinit();
var values = [_]u32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
for (&values) |*value| {
try pool.submit(.{
.func = exampleTask,
.data = value,
});
}
// タスク完了を待つ
std.time.sleep(std.time.ns_per_s * 2);
}
非同期I/O
標準ライブラリのスレッドプール
const std = @import("std");
fn computeTask(allocator: std.mem.Allocator, n: usize) !u64 {
_ = allocator;
var result: u64 = 0;
var i: usize = 0;
while (i < n) : (i += 1) {
result += i;
}
return result;
}
pub fn stdThreadPoolExample() !void {
const allocator = std.heap.page_allocator;
// スレッドプールを作成
var pool: std.Thread.Pool = undefined;
try pool.init(.{ .allocator = allocator });
defer pool.deinit();
const WaitGroup = std.Thread.WaitGroup;
var wg: WaitGroup = undefined;
wg.reset();
const results = try allocator.alloc(u64, 10);
defer allocator.free(results);
for (results, 0..) |*result, i| {
wg.start();
try pool.spawn(struct {
fn run(r: *u64, index: usize, wait_group: *WaitGroup) void {
defer wait_group.finish();
r.* = blk: {
var sum: u64 = 0;
var j: usize = 0;
while (j < (index + 1) * 1000) : (j += 1) {
sum += j;
}
break :blk sum;
};
}
}.run, .{ result, i, &wg });
}
pool.waitAndWork(&wg);
for (results, 0..) |result, i| {
std.debug.print("Task {} result: {}\n", .{ i, result });
}
}
Async/Await(実験的)
非同期関数の基本
注意: Zigのasync/awaitは現在実験的機能です。
const std = @import("std");
fn asyncComputation(n: u32) u32 {
var result: u32 = 0;
var i: u32 = 0;
while (i < n) : (i += 1) {
result += i;
}
return result;
}
fn asyncTask() void {
std.debug.print("Async task started\n", .{});
const result = asyncComputation(1000);
std.debug.print("Async task result: {}\n", .{result});
}
// 注意: async/await の構文は将来変更される可能性があります
フレームの概念
// 非同期関数はフレームを持つ
fn asyncFunction() void {
suspend {
// 実行を中断
}
// 再開後の処理
}
並行パターン
Producer-Consumer パターン
const std = @import("std");
const Queue = struct {
items: std.ArrayList(u32),
mutex: std.Thread.Mutex,
not_empty: std.Thread.Condition,
not_full: std.Thread.Condition,
capacity: usize,
pub fn init(allocator: std.mem.Allocator, capacity: usize) Queue {
return Queue{
.items = std.ArrayList(u32).init(allocator),
.mutex = std.Thread.Mutex{},
.not_empty = std.Thread.Condition{},
.not_full = std.Thread.Condition{},
.capacity = capacity,
};
}
pub fn deinit(self: *Queue) void {
self.items.deinit();
}
pub fn enqueue(self: *Queue, item: u32) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.items.items.len >= self.capacity) {
self.not_full.wait(&self.mutex);
}
self.items.append(item) catch unreachable;
self.not_empty.signal();
}
pub fn dequeue(self: *Queue) u32 {
self.mutex.lock();
defer self.mutex.unlock();
while (self.items.items.len == 0) {
self.not_empty.wait(&self.mutex);
}
const item = self.items.orderedRemove(0);
self.not_full.signal();
return item;
}
};
fn producer(queue: *Queue, id: usize) void {
var i: u32 = 0;
while (i < 10) : (i += 1) {
const item = @as(u32, @intCast(id * 100 + i));
queue.enqueue(item);
std.debug.print("Producer {} produced: {}\n", .{ id, item });
std.time.sleep(std.time.ns_per_ms * 100);
}
}
fn consumer(queue: *Queue, id: usize) void {
var i: u32 = 0;
while (i < 10) : (i += 1) {
const item = queue.dequeue();
std.debug.print("Consumer {} consumed: {}\n", .{ id, item });
std.time.sleep(std.time.ns_per_ms * 150);
}
}
ワーカープールパターン
const std = @import("std");
const WorkItem = struct {
id: u32,
data: u32,
};
const WorkQueue = struct {
items: std.ArrayList(WorkItem),
mutex: std.Thread.Mutex,
condition: std.Thread.Condition,
shutdown: bool,
pub fn init(allocator: std.mem.Allocator) WorkQueue {
return WorkQueue{
.items = std.ArrayList(WorkItem).init(allocator),
.mutex = std.Thread.Mutex{},
.condition = std.Thread.Condition{},
.shutdown = false,
};
}
pub fn deinit(self: *WorkQueue) void {
self.items.deinit();
}
pub fn push(self: *WorkQueue, item: WorkItem) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.items.append(item);
self.condition.signal();
}
pub fn pop(self: *WorkQueue) ?WorkItem {
self.mutex.lock();
defer self.mutex.unlock();
while (self.items.items.len == 0 and !self.shutdown) {
self.condition.wait(&self.mutex);
}
if (self.shutdown and self.items.items.len == 0) {
return null;
}
return self.items.orderedRemove(0);
}
pub fn close(self: *WorkQueue) void {
self.mutex.lock();
defer self.mutex.unlock();
self.shutdown = true;
self.condition.broadcast();
}
};
fn worker(queue: *WorkQueue, id: usize) void {
while (queue.pop()) |item| {
std.debug.print("Worker {} processing item {}: {}\n", .{ id, item.id, item.data });
std.time.sleep(std.time.ns_per_ms * 100);
}
std.debug.print("Worker {} shutting down\n", .{id});
}
パフォーマンス最適化
キャッシュラインの考慮
const std = @import("std");
// 偽共有(False Sharing)を避ける
pub const PaddedCounter = struct {
value: std.atomic.Value(u64) align(64), // キャッシュラインサイズ
padding: [56]u8 = [_]u8{0} ** 56,
pub fn init() PaddedCounter {
return PaddedCounter{
.value = std.atomic.Value(u64).init(0),
};
}
pub fn increment(self: *PaddedCounter) void {
_ = self.value.fetchAdd(1, .monotonic);
}
};
ロックフリーデータ構造
const std = @import("std");
pub const LockFreeStack = struct {
head: std.atomic.Value(?*Node),
pub const Node = struct {
data: u32,
next: ?*Node,
};
pub fn init() LockFreeStack {
return LockFreeStack{
.head = std.atomic.Value(?*Node).init(null),
};
}
pub fn push(self: *LockFreeStack, node: *Node) void {
while (true) {
const old_head = self.head.load(.acquire);
node.next = old_head;
if (self.head.cmpxchgWeak(old_head, node, .release, .acquire) == null) {
break;
}
}
}
pub fn pop(self: *LockFreeStack) ?*Node {
while (true) {
const old_head = self.head.load(.acquire) orelse return null;
if (self.head.cmpxchgWeak(old_head, old_head.next, .release, .acquire) == null) {
return old_head;
}
}
}
};
まとめ
この章では、Zigの並行処理と非同期プログラミングについて学びました:
次の章では、SIMD(Single Instruction Multiple Data)による並列計算について学びます。