Exercise 2: Epoll Deep Diveの実践演習
演習の目的
この演習では、Chapter 2とExplanation 2で学んだepollの知識を実践に移します。レベルトリガーとエッジトリガーの両方を実装し、パフォーマンスを測定し、実世界のユースケースに対応できる堅牢なサーバーを構築します。
前提条件
- Chapter 2の内容を理解している
- Zigの基本的な構文とエラーハンドリング
- Linux環境(epollはLinux固有)
- ネットワークプログラミングの基礎知識
- [ ] レベルトリガーepollサーバーの実装(20点)
- [ ] エッジトリガーepollサーバーの実装(25点)
- [ ] 適切なエラーハンドリング(15点)
- [ ] 接続管理とクリーンアップ(20点)
- [ ] EPOLLONESHOTを使ったマルチスレッドサーバー(5点)
- [ ] timerfdを統合したタイムアウト処理(5点)
- [ ] signalfdでのグレースフルシャットダウン(5点)
- [ ] パフォーマンスベンチマークと最適化(5点)
- TCPソケットをリスニング
- epollインスタンスを作成
- 新しい接続をepollに登録
- データの受信とエコーバック
- 接続の適切なクリーンアップ
評価基準
マンダトリー要件(80点)
以下の要件を全て満たすことで基本点が獲得できます:
ボーナス要件(20点)
以下の高度な機能を実装することで追加点が獲得できます:
演習1: レベルトリガーepollサーバー
目標
epollをレベルトリガーモード(デフォルト)で使用したエコーサーバーを実装します。
実装要件
スターターコード
const std = @import("std");
const os = std.os;
const linux = os.linux;
const net = std.net;
/// レベルトリガーepollサーバー
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// サーバーソケットの作成
const address = try net.Address.parseIp4("127.0.0.1", 8080);
var server = net.StreamServer.init(.{});
defer server.deinit();
try server.listen(address);
const server_fd = server.sockfd.?;
// TODO: epollインスタンスの作成
const epfd = linux.epoll_create1(linux.EPOLL_CLOEXEC);
if (epfd < 0) {
return error.EpollCreateFailed;
}
defer os.close(epfd);
// TODO: サーバーFDをepollに追加
std.debug.print("レベルトリガーepollサーバー起動: {}\n", .{address});
// TODO: イベントループの実装
try eventLoop(epfd, &server, allocator);
}
fn eventLoop(epfd: i32, server: *net.StreamServer, allocator: std.mem.Allocator) !void {
const max_events = 64;
var events: [max_events]linux.epoll_event = undefined;
// TODO: 接続管理用のデータ構造
// ヒント: HashMapで fd -> Connection のマッピング
while (true) {
// TODO: epoll_waitを呼び出す
// TODO: 各イベントを処理
// - サーバーFD: acceptNewConnection()
// - クライアントFD: handleClientData()
}
}
fn epollAdd(epfd: i32, fd: i32, events: u32) !void {
// TODO: epoll_ctl(EPOLL_CTL_ADD) を実装
}
fn epollDelete(epfd: i32, fd: i32) !void {
// TODO: epoll_ctl(EPOLL_CTL_DEL) を実装
}
fn acceptNewConnection(
epfd: i32,
server: *net.StreamServer,
connections: anytype
) !void {
// TODO: 新しい接続の受け付け
// TODO: epollに登録(レベルトリガー)
// TODO: connections に追加
}
fn handleClientData(
epfd: i32,
fd: i32,
connections: anytype
) !void {
var buffer: [4096]u8 = undefined;
// TODO: データの読み込み
// TODO: エコーバック
// TODO: 接続が閉じられた場合の処理
}
実装のヒント
// 接続情報を保存する構造体
const Connection = struct {
stream: net.Stream,
address: net.Address,
};
// epoll_ctlの実装例
fn epollAdd(epfd: i32, fd: i32, events: u32) !void {
var event = linux.epoll_event{
.events = events,
.data = linux.epoll_data{ .fd = fd },
};
const result = linux.epoll_ctl(epfd, linux.EPOLL_CTL_ADD, fd, &event);
if (result < 0) {
std.debug.print("epoll_ctl(ADD) 失敗: fd={}\n", .{fd});
return error.EpollCtlAddFailed;
}
}
// epoll_waitの実装例
const nfds = linux.epoll_wait(
epfd,
&events,
max_events,
-1 // 無限待機
);
if (nfds < 0) {
return error.EpollWaitFailed;
}
// イベント処理
for (events[0..@intCast(nfds)]) |event| {
const fd = event.data.fd;
if (fd == server.sockfd.?) {
// サーバーソケット: 新しい接続
try acceptNewConnection(epfd, server, &connections);
} else {
// クライアントソケット: データ読み込み
try handleClientData(epfd, fd, &connections);
}
}
テスト方法
# サーバーの起動
zig build-exe level_triggered_epoll.zig
./level_triggered_epoll
# 別のターミナルでテスト
echo "Hello, epoll!" | nc localhost 8080
# 複数クライアントの同時接続
for i in {1..100}; do
(echo "Client $i" | nc localhost 8080) &
done
wait
期待される動作
サーバー側:
レベルトリガーepollサーバー起動: 127.0.0.1:8080
新しい接続: fd=4, address=127.0.0.1:xxxxx
受信 (fd=4): 14 バイト
送信 (fd=4): 14 バイト
接続終了: fd=4
クライアント側:
$ echo "Hello, epoll!" | nc localhost 8080
Hello, epoll!
演習2: エッジトリガーepollサーバー
目標
epollをエッジトリガーモード(EPOLLET)で使用した高性能サーバーを実装します。
重要な注意点
エッジトリガーモードでは:
- ノンブロッキングI/Oが必須
- 全データを読み切る必要がある(WouldBlockまで)
- イベントの見逃しは致命的
- すべてのFDをノンブロッキングに設定
- EPOLLETフラグを使用
- WouldBlockまでデータを読み込む
- 書き込みバッファリングの実装
実装要件
スターターコード
const std = @import("std");
const os = std.os;
const linux = os.linux;
const net = std.net;
const Connection = struct {
stream: net.Stream,
address: net.Address,
read_buffer: std.ArrayList(u8),
write_buffer: std.ArrayList(u8),
pub fn init(allocator: std.mem.Allocator, stream: net.Stream, address: net.Address) Connection {
return Connection{
.stream = stream,
.address = address,
.read_buffer = std.ArrayList(u8).init(allocator),
.write_buffer = std.ArrayList(u8).init(allocator),
};
}
pub fn deinit(self: *Connection) void {
self.read_buffer.deinit();
self.write_buffer.deinit();
self.stream.close();
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// サーバーソケットの作成
const address = try net.Address.parseIp4("127.0.0.1", 8080);
var server = net.StreamServer.init(.{});
defer server.deinit();
try server.listen(address);
const server_fd = server.sockfd.?;
// サーバーFDをノンブロッキングに
try setNonBlocking(server_fd);
// epollインスタンスの作成
const epfd = linux.epoll_create1(linux.EPOLL_CLOEXEC);
if (epfd < 0) {
return error.EpollCreateFailed;
}
defer os.close(epfd);
// TODO: サーバーFDをepollに追加(エッジトリガー)
try epollAdd(epfd, server_fd, linux.EPOLLIN | linux.EPOLLET);
std.debug.print("エッジトリガーepollサーバー起動: {}\n", .{address});
try eventLoop(epfd, &server, allocator);
}
fn setNonBlocking(fd: i32) !void {
// TODO: O_NONBLOCKフラグを設定
}
fn eventLoop(epfd: i32, server: *net.StreamServer, allocator: std.mem.Allocator) !void {
const max_events = 64;
var events: [max_events]linux.epoll_event = undefined;
var connections = std.AutoHashMap(i32, Connection).init(allocator);
defer {
var iter = connections.iterator();
while (iter.next()) |entry| {
entry.value_ptr.deinit();
}
connections.deinit();
}
while (true) {
const nfds = linux.epoll_wait(epfd, &events, max_events, -1);
if (nfds < 0) {
return error.EpollWaitFailed;
}
for (events[0..@intCast(nfds)]) |event| {
const fd = event.data.fd;
// エラーまたは切断のチェック
if (event.events & (linux.EPOLLERR | linux.EPOLLHUP) != 0) {
try handleConnectionClose(epfd, fd, &connections);
continue;
}
if (fd == server.sockfd.?) {
// TODO: 新しい接続を受け付ける(複数かもしれない)
try acceptNewConnections(epfd, server, &connections, allocator);
} else {
// TODO: クライアントデータを処理
if (event.events & linux.EPOLLIN != 0) {
try handleClientRead(epfd, fd, &connections);
}
if (event.events & linux.EPOLLOUT != 0) {
try handleClientWrite(epfd, fd, &connections);
}
}
}
}
}
fn acceptNewConnections(
epfd: i32,
server: *net.StreamServer,
connections: *std.AutoHashMap(i32, Connection),
allocator: std.mem.Allocator
) !void {
// エッジトリガーでは、複数の接続が待機している可能性
while (true) {
const result = server.accept();
if (result) |connection| {
const client_fd = connection.stream.handle;
// TODO: ノンブロッキングに設定
try setNonBlocking(client_fd);
// TODO: epollに追加(エッジトリガー、読み込み)
try epollAdd(epfd, client_fd, linux.EPOLLIN | linux.EPOLLET);
// TODO: connectionsマップに追加
const conn = Connection.init(allocator, connection.stream, connection.address);
try connections.put(client_fd, conn);
std.debug.print("新しい接続: fd={}\n", .{client_fd});
} else |err| {
if (err == error.WouldBlock) {
// すべての接続を受け付けた
break;
}
return err;
}
}
}
fn handleClientRead(
epfd: i32,
fd: i32,
connections: *std.AutoHashMap(i32, Connection)
) !void {
var conn = connections.getPtr(fd) orelse return;
var buffer: [4096]u8 = undefined;
// TODO: WouldBlockまで読み込む
while (true) {
const result = os.read(fd, &buffer);
if (result) |bytes_read| {
if (bytes_read == 0) {
// EOF
try handleConnectionClose(epfd, fd, connections);
return;
}
std.debug.print("受信 (fd={}): {} バイト\n", .{fd, bytes_read});
// TODO: 読み込んだデータを処理
// TODO: エコーバックのため write_buffer に追加
try conn.write_buffer.appendSlice(buffer[0..bytes_read]);
// TODO: 書き込み可能イベントを監視
try epollModify(epfd, fd, linux.EPOLLIN | linux.EPOLLOUT | linux.EPOLLET);
} else |err| {
if (err == error.WouldBlock) {
// すべて読み切った(正常)
break;
}
// その他のエラー
std.debug.print("読み込みエラー (fd={}): {}\n", .{fd, err});
try handleConnectionClose(epfd, fd, connections);
return;
}
}
}
fn handleClientWrite(
epfd: i32,
fd: i32,
connections: *std.AutoHashMap(i32, Connection)
) !void {
var conn = connections.getPtr(fd) orelse return;
// TODO: write_bufferのデータを送信
while (conn.write_buffer.items.len > 0) {
const result = os.write(fd, conn.write_buffer.items);
if (result) |bytes_written| {
std.debug.print("送信 (fd={}): {} バイト\n", .{fd, bytes_written});
// TODO: 送信済みデータをバッファから削除
// ヒント: std.mem.copyForwards
if (bytes_written == conn.write_buffer.items.len) {
// すべて送信完了
conn.write_buffer.clearRetainingCapacity();
break;
} else {
// 部分的に送信
const remaining = conn.write_buffer.items[bytes_written..];
std.mem.copyForwards(u8, conn.write_buffer.items, remaining);
conn.write_buffer.shrinkRetainingCapacity(remaining.len);
}
} else |err| {
if (err == error.WouldBlock) {
// 送信できない(カーネルバッファが満杯)
// 次のEPOLLOUTで再試行
break;
}
// その他のエラー
std.debug.print("書き込みエラー (fd={}): {}\n", .{fd, err});
try handleConnectionClose(epfd, fd, connections);
return;
}
}
// TODO: write_bufferが空なら、EPOLLOUTを削除
if (conn.write_buffer.items.len == 0) {
try epollModify(epfd, fd, linux.EPOLLIN | linux.EPOLLET);
}
}
fn handleConnectionClose(
epfd: i32,
fd: i32,
connections: *std.AutoHashMap(i32, Connection)
) !void {
std.debug.print("接続終了: fd={}\n", .{fd});
// TODO: epollから削除
try epollDelete(epfd, fd);
// TODO: connectionsマップから削除
if (connections.fetchRemove(fd)) |entry| {
var conn = entry.value;
conn.deinit();
}
}
fn epollAdd(epfd: i32, fd: i32, events: u32) !void {
var event = linux.epoll_event{
.events = events,
.data = linux.epoll_data{ .fd = fd },
};
const result = linux.epoll_ctl(epfd, linux.EPOLL_CTL_ADD, fd, &event);
if (result < 0) {
return error.EpollCtlAddFailed;
}
}
fn epollModify(epfd: i32, fd: i32, events: u32) !void {
// TODO: EPOLL_CTL_MOD を実装
}
fn epollDelete(epfd: i32, fd: i32) !void {
const result = linux.epoll_ctl(epfd, linux.EPOLL_CTL_DEL, fd, null);
if (result < 0) {
return error.EpollCtlDelFailed;
}
}
テスト方法
# サーバーの起動
zig build-exe edge_triggered_epoll.zig
./edge_triggered_epoll
# 大量データの送信テスト
dd if=/dev/zero bs=1M count=10 | nc localhost 8080
# 複数クライアントの同時接続
for i in {1..1000}; do
(echo "Client $i" | nc localhost 8080) &
done
wait
演習3(ボーナス): EPOLLONESHOTマルチスレッドサーバー
目標
EPOLLONESHOTフラグを使って、マルチスレッド環境で安全なサーバーを実装します。
実装のヒント
const std = @import("std");
const os = std.os;
const linux = os.linux;
const WorkerPool = struct {
threads: []std.Thread,
work_queue: std.atomic.Queue(*Work),
allocator: std.mem.Allocator,
const Work = struct {
epfd: i32,
fd: i32,
event: linux.epoll_event,
};
pub fn init(allocator: std.mem.Allocator, thread_count: usize) !WorkerPool {
var threads = try allocator.alloc(std.Thread, thread_count);
for (threads) |*thread| {
thread.* = try std.Thread.spawn(.{}, workerThread, .{});
}
return WorkerPool{
.threads = threads,
.work_queue = std.atomic.Queue(*Work).init(),
.allocator = allocator,
};
}
fn workerThread() void {
// TODO: work_queueからタスクを取得して処理
}
};
pub fn oneshotServer() !void {
// TODO: EPOLLONESHOTを使ったサーバー実装
// ヒント: linux.EPOLLIN | linux.EPOLLONESHOT
}
演習4(ボーナス): timerfd統合
目標
timerfdを使って、タイムアウト処理を実装します。
実装要件
スターターコード
const Connection = struct {
stream: net.Stream,
last_activity: i64, // ミリ秒タイムスタンプ
pub fn updateActivity(self: *Connection) void {
self.last_activity = std.time.milliTimestamp();
}
pub fn isTimedOut(self: *const Connection, timeout_ms: i64) bool {
const now = std.time.milliTimestamp();
return (now - self.last_activity) > timeout_ms;
}
};
pub fn serverWithTimeout() !void {
// TODO: timerfdの作成
const timer_fd = linux.timerfd_create(
linux.CLOCK.MONOTONIC,
linux.TFD_NONBLOCK | linux.TFD_CLOEXEC
);
// TODO: 5秒ごとに発火するタイマー設定
var new_value = linux.itimerspec{
.it_value = .{ .tv_sec = 5, .tv_nsec = 0 },
.it_interval = .{ .tv_sec = 5, .tv_nsec = 0 },
};
_ = linux.timerfd_settime(timer_fd, 0, &new_value, null);
// TODO: epollに追加
try epollAdd(epfd, timer_fd, linux.EPOLLIN);
// TODO: イベントループでタイマーイベントを処理
}
fn checkTimeouts(
epfd: i32,
connections: *std.AutoHashMap(i32, Connection),
timeout_ms: i64
) !void {
// TODO: 全接続をチェックしてタイムアウト接続を閉じる
}
演習5(ボーナス): signalfd統合
目標
signalfdを使って、グレースフルシャットダウンを実装します。
実装要件
スターターコード
pub fn serverWithSignalHandling() !void {
var running = std.atomic.Atomic(bool).init(true);
// シグナルマスクの設定
var mask: linux.sigset_t = undefined;
linux.sigemptyset(&mask);
linux.sigaddset(&mask, linux.SIG.INT);
linux.sigaddset(&mask, linux.SIG.TERM);
// シグナルをブロック
_ = linux.sigprocmask(linux.SIG.BLOCK, &mask, null);
// signalfdの作成
const signal_fd = linux.signalfd(-1, &mask, linux.SFD_NONBLOCK | linux.SFD_CLOEXEC);
if (signal_fd < 0) {
return error.SignalFdCreateFailed;
}
defer os.close(signal_fd);
// TODO: epollに追加
try epollAdd(epfd, signal_fd, linux.EPOLLIN);
// TODO: イベントループでシグナル処理
while (running.load(.Acquire)) {
// イベント処理...
}
// TODO: クリーンアップ
std.debug.print("グレースフルシャットダウン中...\n", .{});
}
演習6(ボーナス): パフォーマンスベンチマーク
目標
レベルトリガーとエッジトリガーのパフォーマンスを測定・比較します。
測定項目
ベンチマークツール
// benchmark_epoll.zig
const std = @import("std");
const net = std.net;
const time = std.time;
pub fn main() !void {
const args = try std.process.argsAlloc(std.heap.page_allocator);
defer std.process.argsFree(std.heap.page_allocator, args);
if (args.len < 3) {
std.debug.print("使用法: {s} <ホスト> <ポート> [接続数] [リクエスト数]\n", .{args[0]});
return;
}
const host = args[1];
const port = try std.fmt.parseInt(u16, args[2], 10);
const connections = if (args.len > 3) try std.fmt.parseInt(usize, args[3], 10) else 100;
const requests_per_conn = if (args.len > 4) try std.fmt.parseInt(usize, args[4], 10) else 100;
std.debug.print("epollベンチマーク\n", .{});
std.debug.print("==================\n", .{});
std.debug.print("対象: {s}:{d}\n", .{host, port});
std.debug.print("同時接続: {d}\n", .{connections});
std.debug.print("接続あたりリクエスト: {d}\n", .{requests_per_conn});
std.debug.print("総リクエスト: {d}\n\n", .{connections * requests_per_conn});
const start = time.milliTimestamp();
// TODO: ベンチマーク実装
const total_requests = connections * requests_per_conn;
const end = time.milliTimestamp();
const elapsed = end - start;
std.debug.print("\n結果:\n", .{});
std.debug.print("==================\n", .{});
std.debug.print("総時間: {d} ms\n", .{elapsed});
std.debug.print("スループット: {d} req/s\n", .{total_requests * 1000 / @as(usize, @intCast(elapsed))});
std.debug.print("平均レイテンシ: {d} ms\n", .{elapsed / total_requests});
}
システムコール測定
# straceでシステムコール回数を測定
strace -c ./edge_triggered_epoll 2>&1 | grep -E "epoll|read|write"
# perf でパフォーマンス分析
perf stat -e syscalls:sys_enter_epoll_wait,syscalls:sys_enter_read,syscalls:sys_enter_write ./edge_triggered_epoll
期待される結果
パフォーマンス比較(10,000接続、100,000リクエスト):
レベルトリガー:
スループット: ~50,000 req/s
平均レイテンシ: 2ms
epoll_wait呼び出し: ~100,000回
CPU使用率: 15%
エッジトリガー:
スループット: ~80,000 req/s
平均レイテンシ: 1.2ms
epoll_wait呼び出し: ~50,000回
CPU使用率: 10%
結論: エッジトリガーは約1.6倍高速
システムコール回数が約半分
提出物
マンダトリー
level_triggered_epoll.zig- レベルトリガー実装edge_triggered_epoll.zig- エッジトリガー実装README.md- 実装の説明、テスト結果、学んだこと
ボーナス
oneshot_epoll.zig- EPOLLONESHOTマルチスレッド実装timeout_epoll.zig- timerfd統合signal_epoll.zig- signalfd統合benchmark_epoll.zig- ベンチマークツールPERFORMANCE.md- 詳細なパフォーマンス分析
トラブルシューティング
よくある問題
問題1: エッジトリガーでデータが読めなくなる
原因: 全データを読み切っていない
解決方法:
while (true) {
const result = os.read(fd, &buffer);
if (result) |bytes| {
// 処理
} else |err| {
if (err == error.WouldBlock) {
break; // 正常終了
}
return err;
}
}
問題2: ノンブロッキングでないFDをepollに追加
症状: プログラムがハングする
解決方法:
fn setNonBlocking(fd: i32) !void {
const flags = try os.fcntl(fd, os.F.GETFL, 0);
_ = try os.fcntl(fd, os.F.SETFL, flags | os.O.NONBLOCK);
}
// 各FDに対して呼び出す
try setNonBlocking(client_fd);
問題3: epoll_wait が EINVAL を返す
原因: 無効なパラメータ
確認事項:
- epfdが有効か
- maxeventsが正の数か
- eventsバッファが有効か
const nfds = linux.epoll_wait(epfd, &events, max_events, timeout);
if (nfds < 0) {
const err = std.os.errno(nfds);
std.debug.print("epoll_wait エラー: {}\n", .{err});
}
問題4: メモリリーク
原因: 接続クローズ時のクリーンアップ不足
解決方法:
fn handleConnectionClose(...) !void {
// 1. epollから削除
try epollDelete(epfd, fd);
// 2. ソケットをクローズ
os.close(fd);
// 3. Connection構造体のクリーンアップ
if (connections.fetchRemove(fd)) |entry| {
var conn = entry.value;
conn.deinit(); // バッファなどを解放
}
}
チェックリスト
レベルトリガー実装
- [ ] epollインスタンスが正しく作成される
- [ ] 新しい接続が受け付けられる
- [ ] データがエコーバックされる
- [ ] 接続が正しくクリーンアップされる
- [ ] エラーハンドリングが適切
エッジトリガー実装
- [ ] すべてのFDがノンブロッキング
- [ ] EPOLLETフラグが設定されている
- [ ] WouldBlockまでデータを読む
- [ ] 書き込みバッファリングが実装されている
- [ ] EPOLLOUT制御が正しい
ボーナス
- [ ] EPOLLONESHOTが正しく動作
- [ ] タイムアウト処理が機能する
- [ ] グレースフルシャットダウンが動作
- [ ] ベンチマーク結果が文書化されている
- レベルトリガーepollサーバー
- エッジトリガーepollサーバー
- EPOLLONESHOT(ボーナス)
- timerfd統合(ボーナス)
- signalfd統合(ボーナス)
- パフォーマンスベンチマーク(ボーナス)
- Linux man pages: epoll(7), timerfd_create(2), signalfd(2)
- The Linux Programming Interface (Michael Kerrisk)
- Linuxカーネルソースコード
- Zig標準ライブラリドキュメント
まとめ
この演習では以下を実装しました:
これらの実装を通じて、epollの深い理解と実践的なスキルを獲得しました。
次のステップ
次のLesson 3では、io_uringやその他の高度なI/Oパターンを学習します。