Explanation 1: I/Oモデル進化の深層解説

イントロダクション

このドキュメントでは、Chapter 1で学んだI/Oモデルの進化について、より技術的な詳細と歴史的背景を深掘りします。実際のカーネル実装、パフォーマンス特性、そして設計上のトレードオフについて詳しく解説します。

1. I/O操作のカーネルレベル理解

1.1 システムコールの内部動作

I/O操作は、ユーザースペースとカーネルスペース間の遷移を伴います:

ユーザースペース           カーネルスペース
┌─────────────┐          ┌──────────────┐
│アプリケーション│          │   カーネル    │
│             │          │              │
│ read(fd, buf, len)     │              │
│      |      │          │              │
│      v      │          │              │
│  system call ├─────────>│ sys_read()  │
│  インターフェース│          │      |       │
│             │          │      v       │
│             │          │ VFS layer    │
│             │          │      |       │
│             │          │      v       │
│             │          │ File ops     │
│             │          │      |       │
│             │          │      v       │
│             │          │ Device driver│
│             │          │      |       │
│             │<─────────┤ データコピー  │
│             │          │              │
└─────────────┘          └──────────────┘

1.2 ブロッキングI/Oのカーネル実装

// Linux カーネル内部の疑似コード
ssize_t sys_read(int fd, void *buf, size_t count) {
    struct file *file = get_file(fd);

    // ファイルディスクリプタの検証
    if (!file) return -EBADF;

    // データが利用可能になるまでプロセスをスリープ
    while (!data_available(file)) {
        // プロセスを待機キューに追加
        add_to_wait_queue(file->wait_queue, current);

        // プロセスの状態をSLEEPINGに変更
        set_current_state(TASK_INTERRUPTIBLE);

        // スケジューラに制御を渡す
        schedule();

        // ここで他のプロセスが実行される

        // データ到着時、割り込みハンドラが呼び出される
        // wake_up(&file->wait_queue) が実行される
    }

    // データをユーザースペースにコピー
    return copy_to_user(buf, file->buffer, count);
}

プロセス状態遷移:

RUNNING ─(データなし)─> SLEEPING ─(データ到着)─> RUNNING
   |                        |                       |
   v                        v                       v
 read()                 schedule()            return data
   |                        |                       |
   |                   [他のプロセスが実行]         |
   └─────────────────────────────────────────────┘

1.3 ノンブロッキングI/Oのカーネル実装

// ノンブロッキングread()の疑似コード
ssize_t sys_read_nonblocking(int fd, void *buf, size_t count) {
    struct file *file = get_file(fd);

    if (!file) return -EBADF;

    // O_NONBLOCK フラグをチェック
    if (file->f_flags & O_NONBLOCK) {
        if (!data_available(file)) {
            // スリープせずにすぐに返す
            return -EAGAIN;  // または -EWOULDBLOCK
        }
    }

    return copy_to_user(buf, file->buffer, count);
}

2. select/pollの詳細な実装とパフォーマンス分析

2.1 selectの内部実装

const std = @import("std");
const os = std.os;
const linux = os.linux;

// fd_setの内部構造(簡略化)
pub const FdSet = struct {
    // 1024ビットのビットマップ(通常)
    // 各ビットが1つのFDを表す
    bits: [1024 / @bitSizeOf(usize)]usize,

    pub fn zero(self: *FdSet) void {
        for (&self.bits) |*word| {
            word.* = 0;
        }
    }

    pub fn set(self: *FdSet, fd: i32) void {
        const word_idx = @as(usize, @intCast(fd)) / @bitSizeOf(usize);
        const bit_idx = @as(u6, @intCast(@as(usize, @intCast(fd)) % @bitSizeOf(usize)));
        self.bits[word_idx] |= (@as(usize, 1) << bit_idx);
    }

    pub fn isSet(self: *const FdSet, fd: i32) bool {
        const word_idx = @as(usize, @intCast(fd)) / @bitSizeOf(usize);
        const bit_idx = @as(u6, @intCast(@as(usize, @intCast(fd)) % @bitSizeOf(usize)));
        return (self.bits[word_idx] & (@as(usize, 1) << bit_idx)) != 0;
    }
};

selectのカーネル側実装(疑似コード):

int do_select(int nfds, fd_set *readfds, fd_set *writefds,
              fd_set *exceptfds, struct timeval *timeout) {
    int retval = 0;
    struct poll_wqueues table;

    // 待機キューテーブルを初期化
    poll_initwait(&table);

    for (;;) {
        // 全てのファイルディスクリプタをスキャン
        for (int i = 0; i < nfds; i++) {
            if (FD_ISSET(i, readfds)) {
                struct file *file = fget(i);
                // ファイルのpollメソッドを呼び出す
                unsigned int mask = file->f_op->poll(file, &table.pt);

                if (mask & POLLIN) {
                    // 読み込み可能
                    retval++;
                } else {
                    // まだ準備できていない
                    FD_CLR(i, readfds);
                }
            }
        }

        if (retval || timeout_expired)
            break;

        // まだイベントがない場合、スリープ
        schedule_timeout(timeout);
    }

    poll_freewait(&table);
    return retval;
}

2.2 selectのパフォーマンス問題の詳細分析

パフォーマンス分析(N個の接続、K個がアクティブ):

1. FDセットの準備(ユーザースペース)
   時間: O(N)  - 全FDをセットに追加
   メモリ: 128バイト(1024 FDの場合)

2. カーネル空間へのコピー
   時間: O(1)  - 固定サイズのメモリコピー
   メモリ: 128バイト × 3(read, write, except)

3. カーネル内でのスキャン
   時間: O(N)  - 全FDをチェック

   for (i = 0; i < 1024; i++) {
       if (FD_ISSET(i, &fds)) {
           check_if_ready(i);  // 各FDに対して
       }
   }

4. ユーザースペースへのコピー
   時間: O(1)
   メモリ: 128バイト × 3

5. 準備完了FDの検索(ユーザースペース)
   時間: O(N)  - 全FDを再度チェック

   for (i = 0; i < 1024; i++) {
       if (FD_ISSET(i, &ready_fds)) {
           handle_fd(i);
       }
   }

合計時間複雑度: O(N) + O(N) = O(N)

具体的な数値例:

10,000接続の場合(実測値の概算):

select:
- FDセット準備: 10,000回のFD_SET呼び出し
- カーネルコピー: 128バイト × 3 = 384バイト
- カーネルスキャン: 10,000回のチェック
- 結果検索: 10,000回のFD_ISSETチェック

合計: 約20,000回のループ処理
1回のselectコール: 数ミリ秒

2.3 pollの実装とselectとの違い

const std = @import("std");
const os = std.os;
const linux = os.linux;

// pollfdの詳細な使用例
pub fn pollDetailedExample() !void {
    const allocator = std.heap.page_allocator;

    // pollfd配列を動的に確保
    const fd_count = 10000;
    var fds = try allocator.alloc(linux.pollfd, fd_count);
    defer allocator.free(fds);

    // 各pollfd構造体の設定
    for (fds, 0..) |*pfd, i| {
        pfd.* = linux.pollfd{
            .fd = @intCast(i + 3),  // FD 3から開始(0,1,2は標準I/O)
            .events = linux.POLL.IN | linux.POLL.PRI,
            .revents = 0,
        };
    }

    // pollシステムコール
    const timeout: i32 = 5000;  // 5秒
    const result = linux.poll(fds.ptr, @intCast(fds.len), timeout);

    if (result > 0) {
        std.debug.print("準備完了: {d} ファイルディスクリプタ\n", .{result});

        // イベントのチェック
        for (fds) |pfd| {
            if (pfd.revents != 0) {
                std.debug.print("FD {d}: ", .{pfd.fd});

                if (pfd.revents & linux.POLL.IN != 0)
                    std.debug.print("読み込み可能 ", .{});
                if (pfd.revents & linux.POLL.OUT != 0)
                    std.debug.print("書き込み可能 ", .{});
                if (pfd.revents & linux.POLL.ERR != 0)
                    std.debug.print("エラー ", .{});
                if (pfd.revents & linux.POLL.HUP != 0)
                    std.debug.print("切断 ", .{});
                if (pfd.revents & linux.POLL.NVAL != 0)
                    std.debug.print("無効なFD ", .{});

                std.debug.print("\n", .{});
            }
        }
    } else if (result == 0) {
        std.debug.print("タイムアウト\n", .{});
    } else {
        std.debug.print("エラー\n", .{});
    }
}

pollのカーネル実装(疑似コード):

int do_poll(struct pollfd *ufds, unsigned int nfds, int timeout) {
    struct poll_wqueues table;
    int count = 0;

    poll_initwait(&table);

    for (;;) {
        struct pollfd *pfd = ufds;

        // 全てのpollfdをスキャン
        for (unsigned int i = 0; i < nfds; i++, pfd++) {
            struct file *file = fget(pfd->fd);

            if (file) {
                // デバイスドライバのpollを呼び出す
                unsigned int mask = file->f_op->poll(file, &table.pt);

                // イベントをチェック
                if (mask & (POLLIN | POLLOUT | POLLERR | POLLHUP)) {
                    pfd->revents = mask;
                    count++;
                } else {
                    pfd->revents = 0;
                }
            } else {
                pfd->revents = POLLNVAL;
            }
        }

        if (count || timeout_expired)
            break;

        schedule_timeout(timeout);
    }

    poll_freewait(&table);
    return count;
}

2.4 select vs poll の比較表

特性 select poll
**インターフェース** ビットマスク(fd_set) 構造体配列(pollfd)
**最大FD数** FD_SETSIZE(通常1024) 制限なし(メモリ次第)
**FD番号** 0からFD_SETSIZE-1 任意の値
**イベント指定** 3つの独立したセット eventsフィールド
**結果の取得** 同じfd_setを上書き reventsフィールド
**ポータビリティ** POSIX, 広くサポート POSIX, 広くサポート
**時間複雑度** O(N) O(N)
**メモリコピー** 固定サイズ(128バイト×3) 可変サイズ(sizeof(pollfd)×N)

3. C10K問題の歴史的背景

3.1 問題の発見と影響

1999年、Dan Kegelが「The C10K Problem」を発表:

1999年当時のハードウェア:
- CPU: 500MHz Pentium III
- RAM: 256MB
- ネットワーク: 100Mbps Ethernet

1接続あたりのリソース消費:
- スレッド: 1MB(スタック)
- コンテキストスイッチ: 数マイクロ秒
- 10,000接続 = 10GB RAM(不可能)

3.2 解決へのアプローチ

進化のタイムライン:

1983: select (BSD 4.2)
      └─> O(N) スキャン、1024制限

1986: poll (SVR3)
      └─> FD制限なし、依然O(N)

1995: IOCP (Windows NT 3.5)
      └─> 真の非同期I/O

2000: kqueue (FreeBSD 4.1)
      └─> O(1) イベント通知

2002: epoll (Linux 2.5.44)
      └─> O(1) イベント通知
      └─> C10K問題の Linux解決策

2011: io_uring (Linux 5.1, 提案)
      └─> 真の非同期I/O

2019: io_uring (Linux 5.1, リリース)
      └─> 次世代I/Oインターフェース

4. イベント駆動アーキテクチャの設計パターン

4.1 Reactor パターン

Reactorパターンは、イベント駆動システムの基本的なパターンです:

const std = @import("std");

/// Reactor パターンの実装例
pub const Reactor = struct {
    const Self = @This();

    handlers: std.AutoHashMap(i32, *Handler),
    demultiplexer: Demultiplexer,
    running: bool,

    pub const Handler = struct {
        handle_event: *const fn(*Handler, Event) anyerror!void,
    };

    pub const Event = struct {
        fd: i32,
        event_type: EventType,
    };

    pub const EventType = enum {
        read,
        write,
        error,
    };

    pub const Demultiplexer = struct {
        // select/poll/epollを抽象化
        wait_for_events: *const fn(*Demultiplexer, []Event) anyerror!usize,
    };

    pub fn init(allocator: std.mem.Allocator) Self {
        return Self{
            .handlers = std.AutoHashMap(i32, *Handler).init(allocator),
            .demultiplexer = undefined,
            .running = false,
        };
    }

    pub fn register_handler(self: *Self, fd: i32, handler: *Handler) !void {
        try self.handlers.put(fd, handler);
    }

    pub fn remove_handler(self: *Self, fd: i32) void {
        _ = self.handlers.remove(fd);
    }

    pub fn handle_events(self: *Self) !void {
        self.running = true;

        var event_buffer: [1024]Event = undefined;

        while (self.running) {
            // イベントの待機
            const event_count = try self.demultiplexer.wait_for_events(
                &self.demultiplexer,
                &event_buffer
            );

            // 各イベントを処理
            for (event_buffer[0..event_count]) |event| {
                if (self.handlers.get(event.fd)) |handler| {
                    try handler.handle_event(handler, event);
                }
            }
        }
    }

    pub fn stop(self: *Self) void {
        self.running = false;
    }
};

4.2 Proactor パターン

Proactorパターンは、非同期I/Oに適しています:

/// Proactor パターンの概念実装
pub const Proactor = struct {
    const Self = @This();

    completion_queue: std.ArrayList(Completion),
    async_operations: std.AutoHashMap(u64, *AsyncOp),
    next_op_id: u64,

    pub const AsyncOp = struct {
        op_id: u64,
        callback: *const fn(*AsyncOp, []const u8) void,
    };

    pub const Completion = struct {
        op_id: u64,
        result: []const u8,
    };

    pub fn init(allocator: std.mem.Allocator) Self {
        return Self{
            .completion_queue = std.ArrayList(Completion).init(allocator),
            .async_operations = std.AutoHashMap(u64, *AsyncOp).init(allocator),
            .next_op_id = 0,
        };
    }

    // 非同期操作を開始
    pub fn async_read(self: *Self, fd: i32, op: *AsyncOp) !void {
        op.op_id = self.next_op_id;
        self.next_op_id += 1;

        try self.async_operations.put(op.op_id, op);

        // カーネルに非同期読み込みを要求
        // (実装はプラットフォーム依存)
        _ = fd;
    }

    // 完了したイベントを処理
    pub fn handle_completions(self: *Self) !void {
        while (self.completion_queue.items.len > 0) {
            const completion = self.completion_queue.pop();

            if (self.async_operations.get(completion.op_id)) |op| {
                op.callback(op, completion.result);
                _ = self.async_operations.remove(completion.op_id);
            }
        }
    }
};

Reactor vs Proactor の違い:

Reactor (epoll, kqueue):
1. アプリケーションがイベント待機
2. カーネルが「読み込み可能」を通知
3. アプリケーションがread()を呼び出し
4. データを処理

Proactor (IOCP, io_uring):
1. アプリケーションが非同期読み込みを開始
2. カーネルがデータを読み込む
3. カーネルが「読み込み完了」を通知
4. アプリケーションがデータを処理

5. 実世界のアーキテクチャ詳細

5.1 Nginx の内部構造

Nginx のプロセスモデル:

Master Process (root権限)
├─ 設定の読み込み
├─ ソケットのバインド
├─ Workerプロセスの管理
└─ シグナル処理

Worker Process (非特権)
├─ イベントループ
│   ├─ epoll_wait() [Linux]
│   ├─ kqueue() [BSD/macOS]
│   └─ select() [フォールバック]
│
├─ 接続管理
│   ├─ accept()
│   ├─ read()
│   ├─ HTTP解析
│   ├─ プロキシ/静的ファイル
│   └─ write()
│
└─ タイマー管理
    ├─ キープアライブタイムアウト
    ├─ プロキシタイムアウト
    └─ クライアントタイムアウト

Nginxの設定例とその影響:

# nginx.conf
worker_processes auto;  # CPUコア数に合わせる

events {
    worker_connections 10000;  # 各ワーカーの最大接続数
    use epoll;                 # epollを使用
    multi_accept on;           # 一度に複数の接続をaccept
}

# 合計接続数 = worker_processes × worker_connections
# 4コア × 10000 = 40,000 同時接続

5.2 Node.js のイベントループ

Node.js アーキテクチャ:

JavaScript層
    ↓
V8 Engine
    ↓
Node.js Bindings
    ↓
libuv (イベントループ)
    ↓
┌─────────────────────────────────┐
│      Event Loop (libuv)         │
│                                 │
│  ┌──> Timers                    │
│  │    (setTimeout, setInterval) │
│  │                              │
│  ├──> Pending I/O Callbacks     │
│  │                              │
│  ├──> Idle, Prepare             │
│  │                              │
│  ├──> Poll (I/O イベント)       │
│  │    ├─ epoll (Linux)          │
│  │    ├─ kqueue (macOS)         │
│  │    └─ IOCP (Windows)         │
│  │                              │
│  ├──> Check                     │
│  │    (setImmediate)            │
│  │                              │
│  └──> Close Callbacks           │
│       (socket.on('close'))      │
└─────────────────────────────────┘

5.3 Redis のシングルスレッドモデル

// Redis イベントループ(簡略化)
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;

    while (!eventLoop->stop) {
        // 1. タイマーの前処理
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 2. I/Oイベントの処理
        aeProcessEvents(eventLoop,
            AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP);
    }
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    // ファイルイベントの取得
    numevents = aeApiPoll(eventLoop, tvp);

    for (int j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

        // 読み込みイベント
        if (fe->mask & AE_READABLE) {
            fe->rfileProc(eventLoop, fd, fe->clientData, mask);
            processed++;
        }

        // 書き込みイベント
        if (fe->mask & AE_WRITABLE) {
            fe->wfileProc(eventLoop, fd, fe->clientData, mask);
            processed++;
        }
    }

    // タイマーイベント
    processed += processTimeEvents(eventLoop);

    return processed;
}

6. Zigでの実装詳細

6.1 クロスプラットフォームI/O抽象化

const std = @import("std");
const builtin = @import("builtin");

/// プラットフォーム固有のI/O多重化を抽象化
pub const EventMultiplexer = switch (builtin.os.tag) {
    .linux => EpollMultiplexer,
    .macos, .freebsd, .netbsd, .openbsd => KqueueMultiplexer,
    .windows => IocpMultiplexer,
    else => SelectMultiplexer,  // フォールバック
};

pub const EpollMultiplexer = struct {
    // epoll実装(次章で詳述)
};

pub const KqueueMultiplexer = struct {
    // kqueue実装
};

pub const IocpMultiplexer = struct {
    // IOCP実装
};

pub const SelectMultiplexer = struct {
    // select実装(ポータビリティ用)
};

6.2 エラーハンドリングパターン

const std = @import("std");

pub const IoError = error{
    WouldBlock,
    ConnectionReset,
    BrokenPipe,
    TimedOut,
    OutOfMemory,
};

pub fn readWithRetry(fd: std.os.fd_t, buffer: []u8) !usize {
    var attempts: u32 = 0;
    const max_attempts: u32 = 3;

    while (attempts < max_attempts) : (attempts += 1) {
        const result = std.os.read(fd, buffer);

        if (result) |bytes_read| {
            return bytes_read;
        } else |err| {
            switch (err) {
                error.WouldBlock => {
                    // リトライ
                    std.time.sleep(10 * std.time.ns_per_ms);
                    continue;
                },
                error.ConnectionResetByPeer => {
                    return IoError.ConnectionReset;
                },
                else => return err,
            }
        }
    }

    return IoError.TimedOut;
}

7. パフォーマンスチューニング

7.1 ベンチマーク例

const std = @import("std");
const time = std.time;

pub fn benchmarkIoModel(comptime io_func: anytype, iterations: usize) !u64 {
    const timer = try time.Timer.start();
    const start = timer.read();

    var i: usize = 0;
    while (i < iterations) : (i += 1) {
        try io_func();
    }

    const end = timer.read();
    const elapsed = end - start;

    const ns_per_op = elapsed / iterations;
    std.debug.print("操作あたり: {d} ns\n", .{ns_per_op});

    return ns_per_op;
}

7.2 最適化のチェックリスト

I/Oパフォーマンス最適化:

□ 適切なI/Oモデルの選択
  ├─ 接続数 < 1000: スレッドプール可
  ├─ 接続数 1000-10000: epoll/kqueue
  └─ 接続数 > 10000: io_uring検討

□ バッファサイズの調整
  ├─ 小さすぎる: システムコール増加
  └─ 大きすぎる: メモリ浪費

□ システムコールの最小化
  ├─ バッファリング
  ├─ バッチ処理
  └─ ゼロコピー技術

□ カーネルパラメータの調整
  ├─ tcp_tw_reuse
  ├─ somaxconn
  └─ tcp_max_syn_backlog

まとめ

このExplanationでは、I/Oモデルの深層を探りました:

  • カーネルレベルでのI/O実装
  • select/pollの内部動作とパフォーマンス特性
  • C10K問題の歴史と解決策
  • 実世界のアーキテクチャ(Nginx, Node.js, Redis)
  • Zigでのクロスプラットフォーム実装

次のChapter 2では、これらの問題を解決するepollの詳細を学習します。