Explanation 1: I/Oモデル進化の深層解説
イントロダクション
このドキュメントでは、Chapter 1で学んだI/Oモデルの進化について、より技術的な詳細と歴史的背景を深掘りします。実際のカーネル実装、パフォーマンス特性、そして設計上のトレードオフについて詳しく解説します。
1. I/O操作のカーネルレベル理解
1.1 システムコールの内部動作
I/O操作は、ユーザースペースとカーネルスペース間の遷移を伴います:
ユーザースペース カーネルスペース
┌─────────────┐ ┌──────────────┐
│アプリケーション│ │ カーネル │
│ │ │ │
│ read(fd, buf, len) │ │
│ | │ │ │
│ v │ │ │
│ system call ├─────────>│ sys_read() │
│ インターフェース│ │ | │
│ │ │ v │
│ │ │ VFS layer │
│ │ │ | │
│ │ │ v │
│ │ │ File ops │
│ │ │ | │
│ │ │ v │
│ │ │ Device driver│
│ │ │ | │
│ │<─────────┤ データコピー │
│ │ │ │
└─────────────┘ └──────────────┘
1.2 ブロッキングI/Oのカーネル実装
// Linux カーネル内部の疑似コード
ssize_t sys_read(int fd, void *buf, size_t count) {
struct file *file = get_file(fd);
// ファイルディスクリプタの検証
if (!file) return -EBADF;
// データが利用可能になるまでプロセスをスリープ
while (!data_available(file)) {
// プロセスを待機キューに追加
add_to_wait_queue(file->wait_queue, current);
// プロセスの状態をSLEEPINGに変更
set_current_state(TASK_INTERRUPTIBLE);
// スケジューラに制御を渡す
schedule();
// ここで他のプロセスが実行される
// データ到着時、割り込みハンドラが呼び出される
// wake_up(&file->wait_queue) が実行される
}
// データをユーザースペースにコピー
return copy_to_user(buf, file->buffer, count);
}
プロセス状態遷移:
RUNNING ─(データなし)─> SLEEPING ─(データ到着)─> RUNNING
| | |
v v v
read() schedule() return data
| | |
| [他のプロセスが実行] |
└─────────────────────────────────────────────┘
1.3 ノンブロッキングI/Oのカーネル実装
// ノンブロッキングread()の疑似コード
ssize_t sys_read_nonblocking(int fd, void *buf, size_t count) {
struct file *file = get_file(fd);
if (!file) return -EBADF;
// O_NONBLOCK フラグをチェック
if (file->f_flags & O_NONBLOCK) {
if (!data_available(file)) {
// スリープせずにすぐに返す
return -EAGAIN; // または -EWOULDBLOCK
}
}
return copy_to_user(buf, file->buffer, count);
}
2. select/pollの詳細な実装とパフォーマンス分析
2.1 selectの内部実装
const std = @import("std");
const os = std.os;
const linux = os.linux;
// fd_setの内部構造(簡略化)
pub const FdSet = struct {
// 1024ビットのビットマップ(通常)
// 各ビットが1つのFDを表す
bits: [1024 / @bitSizeOf(usize)]usize,
pub fn zero(self: *FdSet) void {
for (&self.bits) |*word| {
word.* = 0;
}
}
pub fn set(self: *FdSet, fd: i32) void {
const word_idx = @as(usize, @intCast(fd)) / @bitSizeOf(usize);
const bit_idx = @as(u6, @intCast(@as(usize, @intCast(fd)) % @bitSizeOf(usize)));
self.bits[word_idx] |= (@as(usize, 1) << bit_idx);
}
pub fn isSet(self: *const FdSet, fd: i32) bool {
const word_idx = @as(usize, @intCast(fd)) / @bitSizeOf(usize);
const bit_idx = @as(u6, @intCast(@as(usize, @intCast(fd)) % @bitSizeOf(usize)));
return (self.bits[word_idx] & (@as(usize, 1) << bit_idx)) != 0;
}
};
selectのカーネル側実装(疑似コード):
int do_select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout) {
int retval = 0;
struct poll_wqueues table;
// 待機キューテーブルを初期化
poll_initwait(&table);
for (;;) {
// 全てのファイルディスクリプタをスキャン
for (int i = 0; i < nfds; i++) {
if (FD_ISSET(i, readfds)) {
struct file *file = fget(i);
// ファイルのpollメソッドを呼び出す
unsigned int mask = file->f_op->poll(file, &table.pt);
if (mask & POLLIN) {
// 読み込み可能
retval++;
} else {
// まだ準備できていない
FD_CLR(i, readfds);
}
}
}
if (retval || timeout_expired)
break;
// まだイベントがない場合、スリープ
schedule_timeout(timeout);
}
poll_freewait(&table);
return retval;
}
2.2 selectのパフォーマンス問題の詳細分析
パフォーマンス分析(N個の接続、K個がアクティブ):
1. FDセットの準備(ユーザースペース)
時間: O(N) - 全FDをセットに追加
メモリ: 128バイト(1024 FDの場合)
2. カーネル空間へのコピー
時間: O(1) - 固定サイズのメモリコピー
メモリ: 128バイト × 3(read, write, except)
3. カーネル内でのスキャン
時間: O(N) - 全FDをチェック
for (i = 0; i < 1024; i++) {
if (FD_ISSET(i, &fds)) {
check_if_ready(i); // 各FDに対して
}
}
4. ユーザースペースへのコピー
時間: O(1)
メモリ: 128バイト × 3
5. 準備完了FDの検索(ユーザースペース)
時間: O(N) - 全FDを再度チェック
for (i = 0; i < 1024; i++) {
if (FD_ISSET(i, &ready_fds)) {
handle_fd(i);
}
}
合計時間複雑度: O(N) + O(N) = O(N)
具体的な数値例:
10,000接続の場合(実測値の概算):
select:
- FDセット準備: 10,000回のFD_SET呼び出し
- カーネルコピー: 128バイト × 3 = 384バイト
- カーネルスキャン: 10,000回のチェック
- 結果検索: 10,000回のFD_ISSETチェック
合計: 約20,000回のループ処理
1回のselectコール: 数ミリ秒
2.3 pollの実装とselectとの違い
const std = @import("std");
const os = std.os;
const linux = os.linux;
// pollfdの詳細な使用例
pub fn pollDetailedExample() !void {
const allocator = std.heap.page_allocator;
// pollfd配列を動的に確保
const fd_count = 10000;
var fds = try allocator.alloc(linux.pollfd, fd_count);
defer allocator.free(fds);
// 各pollfd構造体の設定
for (fds, 0..) |*pfd, i| {
pfd.* = linux.pollfd{
.fd = @intCast(i + 3), // FD 3から開始(0,1,2は標準I/O)
.events = linux.POLL.IN | linux.POLL.PRI,
.revents = 0,
};
}
// pollシステムコール
const timeout: i32 = 5000; // 5秒
const result = linux.poll(fds.ptr, @intCast(fds.len), timeout);
if (result > 0) {
std.debug.print("準備完了: {d} ファイルディスクリプタ\n", .{result});
// イベントのチェック
for (fds) |pfd| {
if (pfd.revents != 0) {
std.debug.print("FD {d}: ", .{pfd.fd});
if (pfd.revents & linux.POLL.IN != 0)
std.debug.print("読み込み可能 ", .{});
if (pfd.revents & linux.POLL.OUT != 0)
std.debug.print("書き込み可能 ", .{});
if (pfd.revents & linux.POLL.ERR != 0)
std.debug.print("エラー ", .{});
if (pfd.revents & linux.POLL.HUP != 0)
std.debug.print("切断 ", .{});
if (pfd.revents & linux.POLL.NVAL != 0)
std.debug.print("無効なFD ", .{});
std.debug.print("\n", .{});
}
}
} else if (result == 0) {
std.debug.print("タイムアウト\n", .{});
} else {
std.debug.print("エラー\n", .{});
}
}
pollのカーネル実装(疑似コード):
int do_poll(struct pollfd *ufds, unsigned int nfds, int timeout) {
struct poll_wqueues table;
int count = 0;
poll_initwait(&table);
for (;;) {
struct pollfd *pfd = ufds;
// 全てのpollfdをスキャン
for (unsigned int i = 0; i < nfds; i++, pfd++) {
struct file *file = fget(pfd->fd);
if (file) {
// デバイスドライバのpollを呼び出す
unsigned int mask = file->f_op->poll(file, &table.pt);
// イベントをチェック
if (mask & (POLLIN | POLLOUT | POLLERR | POLLHUP)) {
pfd->revents = mask;
count++;
} else {
pfd->revents = 0;
}
} else {
pfd->revents = POLLNVAL;
}
}
if (count || timeout_expired)
break;
schedule_timeout(timeout);
}
poll_freewait(&table);
return count;
}
2.4 select vs poll の比較表
| 特性 | select | poll |
|---|---|---|
| **インターフェース** | ビットマスク(fd_set) | 構造体配列(pollfd) |
| **最大FD数** | FD_SETSIZE(通常1024) | 制限なし(メモリ次第) |
| **FD番号** | 0からFD_SETSIZE-1 | 任意の値 |
| **イベント指定** | 3つの独立したセット | eventsフィールド |
| **結果の取得** | 同じfd_setを上書き | reventsフィールド |
| **ポータビリティ** | POSIX, 広くサポート | POSIX, 広くサポート |
| **時間複雑度** | O(N) | O(N) |
| **メモリコピー** | 固定サイズ(128バイト×3) | 可変サイズ(sizeof(pollfd)×N) |
3. C10K問題の歴史的背景
3.1 問題の発見と影響
1999年、Dan Kegelが「The C10K Problem」を発表:
1999年当時のハードウェア:
- CPU: 500MHz Pentium III
- RAM: 256MB
- ネットワーク: 100Mbps Ethernet
1接続あたりのリソース消費:
- スレッド: 1MB(スタック)
- コンテキストスイッチ: 数マイクロ秒
- 10,000接続 = 10GB RAM(不可能)
3.2 解決へのアプローチ
進化のタイムライン:
1983: select (BSD 4.2)
└─> O(N) スキャン、1024制限
1986: poll (SVR3)
└─> FD制限なし、依然O(N)
1995: IOCP (Windows NT 3.5)
└─> 真の非同期I/O
2000: kqueue (FreeBSD 4.1)
└─> O(1) イベント通知
2002: epoll (Linux 2.5.44)
└─> O(1) イベント通知
└─> C10K問題の Linux解決策
2011: io_uring (Linux 5.1, 提案)
└─> 真の非同期I/O
2019: io_uring (Linux 5.1, リリース)
└─> 次世代I/Oインターフェース
4. イベント駆動アーキテクチャの設計パターン
4.1 Reactor パターン
Reactorパターンは、イベント駆動システムの基本的なパターンです:
const std = @import("std");
/// Reactor パターンの実装例
pub const Reactor = struct {
const Self = @This();
handlers: std.AutoHashMap(i32, *Handler),
demultiplexer: Demultiplexer,
running: bool,
pub const Handler = struct {
handle_event: *const fn(*Handler, Event) anyerror!void,
};
pub const Event = struct {
fd: i32,
event_type: EventType,
};
pub const EventType = enum {
read,
write,
error,
};
pub const Demultiplexer = struct {
// select/poll/epollを抽象化
wait_for_events: *const fn(*Demultiplexer, []Event) anyerror!usize,
};
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.handlers = std.AutoHashMap(i32, *Handler).init(allocator),
.demultiplexer = undefined,
.running = false,
};
}
pub fn register_handler(self: *Self, fd: i32, handler: *Handler) !void {
try self.handlers.put(fd, handler);
}
pub fn remove_handler(self: *Self, fd: i32) void {
_ = self.handlers.remove(fd);
}
pub fn handle_events(self: *Self) !void {
self.running = true;
var event_buffer: [1024]Event = undefined;
while (self.running) {
// イベントの待機
const event_count = try self.demultiplexer.wait_for_events(
&self.demultiplexer,
&event_buffer
);
// 各イベントを処理
for (event_buffer[0..event_count]) |event| {
if (self.handlers.get(event.fd)) |handler| {
try handler.handle_event(handler, event);
}
}
}
}
pub fn stop(self: *Self) void {
self.running = false;
}
};
4.2 Proactor パターン
Proactorパターンは、非同期I/Oに適しています:
/// Proactor パターンの概念実装
pub const Proactor = struct {
const Self = @This();
completion_queue: std.ArrayList(Completion),
async_operations: std.AutoHashMap(u64, *AsyncOp),
next_op_id: u64,
pub const AsyncOp = struct {
op_id: u64,
callback: *const fn(*AsyncOp, []const u8) void,
};
pub const Completion = struct {
op_id: u64,
result: []const u8,
};
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.completion_queue = std.ArrayList(Completion).init(allocator),
.async_operations = std.AutoHashMap(u64, *AsyncOp).init(allocator),
.next_op_id = 0,
};
}
// 非同期操作を開始
pub fn async_read(self: *Self, fd: i32, op: *AsyncOp) !void {
op.op_id = self.next_op_id;
self.next_op_id += 1;
try self.async_operations.put(op.op_id, op);
// カーネルに非同期読み込みを要求
// (実装はプラットフォーム依存)
_ = fd;
}
// 完了したイベントを処理
pub fn handle_completions(self: *Self) !void {
while (self.completion_queue.items.len > 0) {
const completion = self.completion_queue.pop();
if (self.async_operations.get(completion.op_id)) |op| {
op.callback(op, completion.result);
_ = self.async_operations.remove(completion.op_id);
}
}
}
};
Reactor vs Proactor の違い:
Reactor (epoll, kqueue):
1. アプリケーションがイベント待機
2. カーネルが「読み込み可能」を通知
3. アプリケーションがread()を呼び出し
4. データを処理
Proactor (IOCP, io_uring):
1. アプリケーションが非同期読み込みを開始
2. カーネルがデータを読み込む
3. カーネルが「読み込み完了」を通知
4. アプリケーションがデータを処理
5. 実世界のアーキテクチャ詳細
5.1 Nginx の内部構造
Nginx のプロセスモデル:
Master Process (root権限)
├─ 設定の読み込み
├─ ソケットのバインド
├─ Workerプロセスの管理
└─ シグナル処理
Worker Process (非特権)
├─ イベントループ
│ ├─ epoll_wait() [Linux]
│ ├─ kqueue() [BSD/macOS]
│ └─ select() [フォールバック]
│
├─ 接続管理
│ ├─ accept()
│ ├─ read()
│ ├─ HTTP解析
│ ├─ プロキシ/静的ファイル
│ └─ write()
│
└─ タイマー管理
├─ キープアライブタイムアウト
├─ プロキシタイムアウト
└─ クライアントタイムアウト
Nginxの設定例とその影響:
# nginx.conf
worker_processes auto; # CPUコア数に合わせる
events {
worker_connections 10000; # 各ワーカーの最大接続数
use epoll; # epollを使用
multi_accept on; # 一度に複数の接続をaccept
}
# 合計接続数 = worker_processes × worker_connections
# 4コア × 10000 = 40,000 同時接続
5.2 Node.js のイベントループ
Node.js アーキテクチャ:
JavaScript層
↓
V8 Engine
↓
Node.js Bindings
↓
libuv (イベントループ)
↓
┌─────────────────────────────────┐
│ Event Loop (libuv) │
│ │
│ ┌──> Timers │
│ │ (setTimeout, setInterval) │
│ │ │
│ ├──> Pending I/O Callbacks │
│ │ │
│ ├──> Idle, Prepare │
│ │ │
│ ├──> Poll (I/O イベント) │
│ │ ├─ epoll (Linux) │
│ │ ├─ kqueue (macOS) │
│ │ └─ IOCP (Windows) │
│ │ │
│ ├──> Check │
│ │ (setImmediate) │
│ │ │
│ └──> Close Callbacks │
│ (socket.on('close')) │
└─────────────────────────────────┘
5.3 Redis のシングルスレッドモデル
// Redis イベントループ(簡略化)
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 1. タイマーの前処理
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 2. I/Oイベントの処理
aeProcessEvents(eventLoop,
AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
// ファイルイベントの取得
numevents = aeApiPoll(eventLoop, tvp);
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 読み込みイベント
if (fe->mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
processed++;
}
// 書き込みイベント
if (fe->mask & AE_WRITABLE) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
processed++;
}
}
// タイマーイベント
processed += processTimeEvents(eventLoop);
return processed;
}
6. Zigでの実装詳細
6.1 クロスプラットフォームI/O抽象化
const std = @import("std");
const builtin = @import("builtin");
/// プラットフォーム固有のI/O多重化を抽象化
pub const EventMultiplexer = switch (builtin.os.tag) {
.linux => EpollMultiplexer,
.macos, .freebsd, .netbsd, .openbsd => KqueueMultiplexer,
.windows => IocpMultiplexer,
else => SelectMultiplexer, // フォールバック
};
pub const EpollMultiplexer = struct {
// epoll実装(次章で詳述)
};
pub const KqueueMultiplexer = struct {
// kqueue実装
};
pub const IocpMultiplexer = struct {
// IOCP実装
};
pub const SelectMultiplexer = struct {
// select実装(ポータビリティ用)
};
6.2 エラーハンドリングパターン
const std = @import("std");
pub const IoError = error{
WouldBlock,
ConnectionReset,
BrokenPipe,
TimedOut,
OutOfMemory,
};
pub fn readWithRetry(fd: std.os.fd_t, buffer: []u8) !usize {
var attempts: u32 = 0;
const max_attempts: u32 = 3;
while (attempts < max_attempts) : (attempts += 1) {
const result = std.os.read(fd, buffer);
if (result) |bytes_read| {
return bytes_read;
} else |err| {
switch (err) {
error.WouldBlock => {
// リトライ
std.time.sleep(10 * std.time.ns_per_ms);
continue;
},
error.ConnectionResetByPeer => {
return IoError.ConnectionReset;
},
else => return err,
}
}
}
return IoError.TimedOut;
}
7. パフォーマンスチューニング
7.1 ベンチマーク例
const std = @import("std");
const time = std.time;
pub fn benchmarkIoModel(comptime io_func: anytype, iterations: usize) !u64 {
const timer = try time.Timer.start();
const start = timer.read();
var i: usize = 0;
while (i < iterations) : (i += 1) {
try io_func();
}
const end = timer.read();
const elapsed = end - start;
const ns_per_op = elapsed / iterations;
std.debug.print("操作あたり: {d} ns\n", .{ns_per_op});
return ns_per_op;
}
7.2 最適化のチェックリスト
I/Oパフォーマンス最適化:
□ 適切なI/Oモデルの選択
├─ 接続数 < 1000: スレッドプール可
├─ 接続数 1000-10000: epoll/kqueue
└─ 接続数 > 10000: io_uring検討
□ バッファサイズの調整
├─ 小さすぎる: システムコール増加
└─ 大きすぎる: メモリ浪費
□ システムコールの最小化
├─ バッファリング
├─ バッチ処理
└─ ゼロコピー技術
□ カーネルパラメータの調整
├─ tcp_tw_reuse
├─ somaxconn
└─ tcp_max_syn_backlog
まとめ
このExplanationでは、I/Oモデルの深層を探りました:
- カーネルレベルでのI/O実装
- select/pollの内部動作とパフォーマンス特性
- C10K問題の歴史と解決策
- 実世界のアーキテクチャ(Nginx, Node.js, Redis)
- Zigでのクロスプラットフォーム実装
次のChapter 2では、これらの問題を解決するepollの詳細を学習します。