Explanation 04: Network Server Design

完全なサーバー実装

このセクションでは、本格的なネットワークサーバーを段階的に構築していきます。

Phase 1: 基本的なTCPサーバー

Echo Server with Connection Pool

const std = @import("std");
const net = std.net;
const os = std.os;
const linux = os.linux;

pub const EchoServer = struct {
    allocator: std.mem.Allocator,
    address: net.Address,
    listener: net.Server,
    pool: ConnectionPool,
    shutdown: GracefulShutdown,
    epoll_fd: i32,

    const Self = @This();

    pub fn init(
        allocator: std.mem.Allocator,
        port: u16,
        max_connections: usize,
    ) !Self {
        const address = try net.Address.parseIp("0.0.0.0", port);
        const listener = try address.listen(.{
            .reuse_address = true,
            .reuse_port = true,
        });

        // Create epoll instance
        const epoll_fd = try os.epoll_create1(linux.EPOLL.CLOEXEC);
        errdefer os.close(epoll_fd);

        // Add listener to epoll
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{ .fd = listener.stream.handle },
        };
        try os.epoll_ctl(
            epoll_fd,
            linux.EPOLL.CTL_ADD,
            listener.stream.handle,
            &event,
        );

        return Self{
            .allocator = allocator,
            .address = address,
            .listener = listener,
            .pool = try ConnectionPool.init(allocator, max_connections),
            .shutdown = GracefulShutdown.init(30000), // 30s timeout
            .epoll_fd = epoll_fd,
        };
    }

    pub fn run(self: *Self) !void {
        std.log.info("Echo server listening on {}", .{self.address});

        var events: [128]linux.epoll_event = undefined;

        while (self.shutdown.shouldAcceptConnections()) {
            const n = os.epoll_wait(self.epoll_fd, &events, -1);

            for (events[0..@intCast(n)]) |event| {
                if (event.data.fd == self.listener.stream.handle) {
                    // Accept new connection
                    self.acceptConnection() catch |err| {
                        std.log.err("Failed to accept connection: {}", .{err});
                    };
                } else {
                    // Handle existing connection
                    self.handleConnection(event.data.fd) catch |err| {
                        std.log.err("Error handling connection: {}", .{err});
                        self.closeConnection(event.data.fd);
                    };
                }
            }

            // Timeout check every iteration
            self.pool.timeoutCheck(60000); // 60s timeout
        }

        // Graceful shutdown
        try self.shutdown.wait();
        std.log.info("Server shutdown complete", .{});
    }

    fn acceptConnection(self: *Self) !void {
        if (!self.shutdown.shouldAcceptConnections()) {
            return;
        }

        const conn = try self.listener.accept();

        // Configure socket
        try setNonBlocking(conn.stream.handle);
        try setNoDelay(conn.stream.handle);

        // Acquire connection from pool
        const pool_conn = self.pool.acquire() orelse {
            std.log.warn("Connection pool full, rejecting connection", .{});
            conn.stream.close();
            return;
        };

        pool_conn.fd = conn.stream.handle;
        pool_conn.state = .reading;
        pool_conn.read_offset = 0;
        pool_conn.write_offset = 0;
        pool_conn.last_activity = std.time.milliTimestamp();

        // Add to epoll
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN | linux.EPOLL.ET, // Edge-triggered
            .data = .{ .fd = conn.stream.handle },
        };
        try os.epoll_ctl(
            self.epoll_fd,
            linux.EPOLL.CTL_ADD,
            conn.stream.handle,
            &event,
        );

        self.shutdown.onConnectionAccepted();
        std.log.info("New connection accepted: fd={}", .{conn.stream.handle});
    }

    fn handleConnection(self: *Self, fd: i32) !void {
        const conn = self.pool.findByFd(fd) orelse return error.ConnectionNotFound;
        conn.last_activity = std.time.milliTimestamp();

        switch (conn.state) {
            .reading => try self.handleRead(conn),
            .writing => try self.handleWrite(conn),
            else => {},
        }
    }

    fn handleRead(self: *Self, conn: *ConnectionPool.Connection) !void {
        while (true) {
            const n = os.read(
                conn.fd,
                conn.buffer[conn.read_offset..],
            ) catch |err| {
                if (err == error.WouldBlock) return;
                return err;
            };

            if (n == 0) {
                // Connection closed by peer
                self.closeConnection(conn.fd);
                return;
            }

            conn.read_offset += n;

            // Echo: copy to write buffer
            conn.write_offset = conn.read_offset;
            conn.read_offset = 0;
            conn.state = .writing;

            // Modify epoll to watch for EPOLLOUT
            var event = linux.epoll_event{
                .events = linux.EPOLL.OUT | linux.EPOLL.ET,
                .data = .{ .fd = conn.fd },
            };
            try os.epoll_ctl(
                self.epoll_fd,
                linux.EPOLL.CTL_MOD,
                conn.fd,
                &event,
            );

            return; // Let epoll notify us when ready to write
        }
    }

    fn handleWrite(self: *Self, conn: *ConnectionPool.Connection) !void {
        var written: usize = 0;

        while (written < conn.write_offset) {
            const n = os.write(
                conn.fd,
                conn.buffer[written..conn.write_offset],
            ) catch |err| {
                if (err == error.WouldBlock) {
                    // Shift remaining data to start of buffer
                    std.mem.copyForwards(
                        u8,
                        conn.buffer[0..],
                        conn.buffer[written..conn.write_offset],
                    );
                    conn.write_offset -= written;
                    return;
                }
                return err;
            };

            written += n;
        }

        // All data written, switch back to reading
        conn.write_offset = 0;
        conn.state = .reading;

        var event = linux.epoll_event{
            .events = linux.EPOLL.IN | linux.EPOLL.ET,
            .data = .{ .fd = conn.fd },
        };
        try os.epoll_ctl(
            self.epoll_fd,
            linux.EPOLL.CTL_MOD,
            conn.fd,
            &event,
        );
    }

    fn closeConnection(self: *Self, fd: i32) void {
        if (self.pool.findByFd(fd)) |conn| {
            os.close(fd);
            self.pool.release(conn);
            self.shutdown.onConnectionClosed();
            std.log.info("Connection closed: fd={}", .{fd});
        }
    }

    pub fn deinit(self: *Self) void {
        os.close(self.epoll_fd);
        self.listener.deinit();
        self.pool.deinit();
    }
};

fn setNonBlocking(fd: i32) !void {
    const flags = try os.fcntl(fd, os.F.GETFL, 0);
    _ = try os.fcntl(fd, os.F.SETFL, flags | @as(u32, os.O.NONBLOCK));
}

fn setNoDelay(fd: i32) !void {
    try os.setsockopt(
        fd,
        os.IPPROTO.TCP,
        os.TCP.NODELAY,
        &std.mem.toBytes(@as(c_int, 1)),
    );
}

Connection Pool実装

pub const ConnectionPool = struct {
    connections: []Connection,
    free_list: std.ArrayList(usize),
    used_map: std.AutoHashMap(i32, usize),
    mutex: std.Thread.Mutex,
    buffer_allocator: std.mem.Allocator,

    pub const Connection = struct {
        fd: i32,
        state: State,
        buffer: []u8,
        read_offset: usize,
        write_offset: usize,
        last_activity: i64,

        pub const State = enum {
            idle,
            reading,
            processing,
            writing,
            closing,
        };
    };

    pub fn init(allocator: std.mem.Allocator, capacity: usize) !ConnectionPool {
        var connections = try allocator.alloc(Connection, capacity);
        errdefer allocator.free(connections);

        var free_list = try std.ArrayList(usize).initCapacity(allocator, capacity);
        errdefer free_list.deinit();

        var used_map = std.AutoHashMap(i32, usize).init(allocator);
        errdefer used_map.deinit();

        // Pre-allocate buffers
        for (connections, 0..) |*conn, i| {
            conn.fd = -1;
            conn.state = .idle;
            conn.buffer = try allocator.alloc(u8, 8192);
            conn.read_offset = 0;
            conn.write_offset = 0;
            conn.last_activity = 0;

            try free_list.append(i);
        }

        return ConnectionPool{
            .connections = connections,
            .free_list = free_list,
            .used_map = used_map,
            .mutex = std.Thread.Mutex{},
            .buffer_allocator = allocator,
        };
    }

    pub fn acquire(self: *ConnectionPool) ?*Connection {
        self.mutex.lock();
        defer self.mutex.unlock();

        if (self.free_list.popOrNull()) |idx| {
            const conn = &self.connections[idx];
            return conn;
        }
        return null;
    }

    pub fn release(self: *ConnectionPool, conn: *Connection) void {
        self.mutex.lock();
        defer self.mutex.unlock();

        _ = self.used_map.remove(conn.fd);

        const idx = (@intFromPtr(conn) - @intFromPtr(self.connections.ptr)) /
            @sizeOf(Connection);

        conn.fd = -1;
        conn.state = .idle;
        conn.read_offset = 0;
        conn.write_offset = 0;

        self.free_list.append(idx) catch {};
    }

    pub fn findByFd(self: *ConnectionPool, fd: i32) ?*Connection {
        self.mutex.lock();
        defer self.mutex.unlock();

        if (self.used_map.get(fd)) |idx| {
            return &self.connections[idx];
        }

        // Slow path: linear search (for newly accepted connections)
        for (self.connections) |*conn| {
            if (conn.fd == fd) {
                const idx = (@intFromPtr(conn) - @intFromPtr(self.connections.ptr)) /
                    @sizeOf(Connection);
                self.used_map.put(fd, idx) catch {};
                return conn;
            }
        }

        return null;
    }

    pub fn timeoutCheck(self: *ConnectionPool, timeout_ms: i64) void {
        const now = std.time.milliTimestamp();

        for (self.connections) |*conn| {
            if (conn.state != .idle and
                now - conn.last_activity > timeout_ms)
            {
                std.log.warn("Connection timeout: fd={}", .{conn.fd});
                self.release(conn);
            }
        }
    }

    pub fn deinit(self: *ConnectionPool) void {
        for (self.connections) |*conn| {
            self.buffer_allocator.free(conn.buffer);
        }
        self.buffer_allocator.free(self.connections);
        self.free_list.deinit();
        self.used_map.deinit();
    }
};

Phase 2: HTTP Server実装

基本的なHTTPパーサー

pub const HttpRequest = struct {
    method: Method,
    path: []const u8,
    version: []const u8,
    headers: std.StringHashMap([]const u8),
    body: []const u8,

    pub const Method = enum {
        GET,
        POST,
        PUT,
        DELETE,
        HEAD,
        OPTIONS,
        PATCH,

        pub fn parse(s: []const u8) ?Method {
            if (std.mem.eql(u8, s, "GET")) return .GET;
            if (std.mem.eql(u8, s, "POST")) return .POST;
            if (std.mem.eql(u8, s, "PUT")) return .PUT;
            if (std.mem.eql(u8, s, "DELETE")) return .DELETE;
            if (std.mem.eql(u8, s, "HEAD")) return .HEAD;
            if (std.mem.eql(u8, s, "OPTIONS")) return .OPTIONS;
            if (std.mem.eql(u8, s, "PATCH")) return .PATCH;
            return null;
        }
    };

    pub fn parse(
        allocator: std.mem.Allocator,
        buffer: []const u8,
    ) !HttpRequest {
        var headers = std.StringHashMap([]const u8).init(allocator);
        errdefer headers.deinit();

        var lines = std.mem.splitSequence(u8, buffer, "\r\n");

        // Parse request line
        const request_line = lines.next() orelse return error.InvalidRequest;
        var parts = std.mem.splitScalar(u8, request_line, ' ');

        const method_str = parts.next() orelse return error.InvalidMethod;
        const path = parts.next() orelse return error.InvalidPath;
        const version = parts.next() orelse return error.InvalidVersion;

        const method = Method.parse(method_str) orelse return error.UnknownMethod;

        // Parse headers
        while (lines.next()) |line| {
            if (line.len == 0) break; // End of headers

            const colon = std.mem.indexOf(u8, line, ":") orelse continue;
            const key = std.mem.trim(u8, line[0..colon], " \t");
            const value = std.mem.trim(u8, line[colon + 1 ..], " \t");

            try headers.put(key, value);
        }

        // Remaining is body
        const body = lines.rest();

        return HttpRequest{
            .method = method,
            .path = path,
            .version = version,
            .headers = headers,
            .body = body,
        };
    }

    pub fn deinit(self: *HttpRequest) void {
        self.headers.deinit();
    }
};

pub const HttpResponse = struct {
    status: u16,
    status_text: []const u8,
    headers: std.StringHashMap([]const u8),
    body: []const u8,

    pub fn format(
        self: HttpResponse,
        allocator: std.mem.Allocator,
    ) ![]u8 {
        var list = std.ArrayList(u8).init(allocator);
        errdefer list.deinit();

        // Status line
        try list.writer().print(
            "HTTP/1.1 {} {s}\r\n",
            .{ self.status, self.status_text },
        );

        // Headers
        var iter = self.headers.iterator();
        while (iter.next()) |entry| {
            try list.writer().print(
                "{s}: {s}\r\n",
                .{ entry.key_ptr.*, entry.value_ptr.* },
            );
        }

        // Content-Length (if not already set)
        if (!self.headers.contains("Content-Length")) {
            try list.writer().print(
                "Content-Length: {}\r\n",
                .{self.body.len},
            );
        }

        // End of headers
        try list.appendSlice("\r\n");

        // Body
        try list.appendSlice(self.body);

        return list.toOwnedSlice();
    }
};

HTTP Server with Router

pub const HttpServer = struct {
    allocator: std.mem.Allocator,
    echo_server: EchoServer,
    router: Router,
    static_dir: []const u8,
    stats: Statistics,

    const Self = @This();

    pub const Router = struct {
        routes: std.StringHashMap(HandlerFn),

        pub const HandlerFn = *const fn (
            *HttpServer,
            *HttpRequest,
        ) anyerror!HttpResponse;

        pub fn init(allocator: std.mem.Allocator) Router {
            return .{
                .routes = std.StringHashMap(HandlerFn).init(allocator),
            };
        }

        pub fn addRoute(
            self: *Router,
            path: []const u8,
            handler: HandlerFn,
        ) !void {
            try self.routes.put(path, handler);
        }

        pub fn match(self: *Router, path: []const u8) ?HandlerFn {
            return self.routes.get(path);
        }

        pub fn deinit(self: *Router) void {
            self.routes.deinit();
        }
    };

    pub const Statistics = struct {
        requests_total: std.atomic.Value(u64),
        requests_ok: std.atomic.Value(u64),
        requests_error: std.atomic.Value(u64),
        bytes_received: std.atomic.Value(u64),
        bytes_sent: std.atomic.Value(u64),
        start_time: i64,

        pub fn init() Statistics {
            return .{
                .requests_total = std.atomic.Value(u64).init(0),
                .requests_ok = std.atomic.Value(u64).init(0),
                .requests_error = std.atomic.Value(u64).init(0),
                .bytes_received = std.atomic.Value(u64).init(0),
                .bytes_sent = std.atomic.Value(u64).init(0),
                .start_time = std.time.milliTimestamp(),
            };
        }

        pub fn recordRequest(self: *Statistics, ok: bool) void {
            _ = self.requests_total.fetchAdd(1, .monotonic);
            if (ok) {
                _ = self.requests_ok.fetchAdd(1, .monotonic);
            } else {
                _ = self.requests_error.fetchAdd(1, .monotonic);
            }
        }
    };

    pub fn init(
        allocator: std.mem.Allocator,
        port: u16,
        static_dir: []const u8,
    ) !Self {
        var echo_server = try EchoServer.init(allocator, port, 10000);
        var router = Router.init(allocator);

        // Register routes
        try router.addRoute("/", handleIndex);
        try router.addRoute("/stats", handleStats);

        return Self{
            .allocator = allocator,
            .echo_server = echo_server,
            .router = router,
            .static_dir = static_dir,
            .stats = Statistics.init(),
        };
    }

    fn handleIndex(
        self: *HttpServer,
        request: *HttpRequest,
    ) !HttpResponse {
        _ = request;
        _ = self;

        const body =
            \\<!DOCTYPE html>
            \\<html>
            \\<head><title>Zig HTTP Server</title></head>
            \\<body>
            \\  <h1>Welcome to Zig HTTP Server</h1>
            \\  <p>Built with io_uring and love!</p>
            \\  <a href="/stats">Statistics</a>
            \\</body>
            \\</html>
        ;

        var headers = std.StringHashMap([]const u8).init(self.allocator);
        try headers.put("Content-Type", "text/html; charset=utf-8");

        return HttpResponse{
            .status = 200,
            .status_text = "OK",
            .headers = headers,
            .body = body,
        };
    }

    fn handleStats(
        self: *HttpServer,
        request: *HttpRequest,
    ) !HttpResponse {
        _ = request;

        const total = self.stats.requests_total.load(.monotonic);
        const ok = self.stats.requests_ok.load(.monotonic);
        const err = self.stats.requests_error.load(.monotonic);
        const rx = self.stats.bytes_received.load(.monotonic);
        const tx = self.stats.bytes_sent.load(.monotonic);

        const uptime = std.time.milliTimestamp() - self.stats.start_time;

        const body = try std.fmt.allocPrint(
            self.allocator,
            \\{{
            \\  "uptime_ms": {},
            \\  "requests": {{
            \\    "total": {},
            \\    "ok": {},
            \\    "error": {}
            \\  }},
            \\  "traffic": {{
            \\    "bytes_received": {},
            \\    "bytes_sent": {}
            \\  }}
            \\}}
        ,
            .{ uptime, total, ok, err, rx, tx },
        );

        var headers = std.StringHashMap([]const u8).init(self.allocator);
        try headers.put("Content-Type", "application/json");

        return HttpResponse{
            .status = 200,
            .status_text = "OK",
            .headers = headers,
            .body = body,
        };
    }

    pub fn deinit(self: *Self) void {
        self.router.deinit();
        self.echo_server.deinit();
    }
};

Phase 3: Load Balancing

Round-Robin Load Balancer

pub const LoadBalancer = struct {
    workers: []Worker,
    next: std.atomic.Value(usize),
    strategy: Strategy,

    pub const Strategy = enum {
        round_robin,
        least_connections,
        least_response_time,
        random,
    };

    pub const Worker = struct {
        id: usize,
        queue: ConnectionQueue,
        active_connections: std.atomic.Value(usize),
        total_response_time_ms: std.atomic.Value(u64),
        total_requests: std.atomic.Value(u64),

        pub fn avgResponseTime(self: *Worker) f64 {
            const total_time = self.total_response_time_ms.load(.monotonic);
            const total_reqs = self.total_requests.load(.monotonic);
            if (total_reqs == 0) return 0.0;
            return @as(f64, @floatFromInt(total_time)) /
                @as(f64, @floatFromInt(total_reqs));
        }
    };

    pub fn init(
        allocator: std.mem.Allocator,
        num_workers: usize,
        strategy: Strategy,
    ) !LoadBalancer {
        var workers = try allocator.alloc(Worker, num_workers);

        for (workers, 0..) |*worker, i| {
            worker.* = .{
                .id = i,
                .queue = try ConnectionQueue.init(allocator),
                .active_connections = std.atomic.Value(usize).init(0),
                .total_response_time_ms = std.atomic.Value(u64).init(0),
                .total_requests = std.atomic.Value(u64).init(0),
            };
        }

        return LoadBalancer{
            .workers = workers,
            .next = std.atomic.Value(usize).init(0),
            .strategy = strategy,
        };
    }

    pub fn dispatch(self: *LoadBalancer, conn: Connection) !void {
        const worker = switch (self.strategy) {
            .round_robin => self.selectRoundRobin(),
            .least_connections => self.selectLeastConnections(),
            .least_response_time => self.selectLeastResponseTime(),
            .random => self.selectRandom(),
        };

        try worker.queue.push(conn);
        _ = worker.active_connections.fetchAdd(1, .monotonic);
    }

    fn selectRoundRobin(self: *LoadBalancer) *Worker {
        const idx = self.next.fetchAdd(1, .monotonic) % self.workers.len;
        return &self.workers[idx];
    }

    fn selectLeastConnections(self: *LoadBalancer) *Worker {
        var min_conn: usize = std.math.maxInt(usize);
        var selected: *Worker = &self.workers[0];

        for (self.workers) |*worker| {
            const conns = worker.active_connections.load(.monotonic);
            if (conns < min_conn) {
                min_conn = conns;
                selected = worker;
            }
        }

        return selected;
    }

    fn selectLeastResponseTime(self: *LoadBalancer) *Worker {
        var min_time: f64 = std.math.floatMax(f64);
        var selected: *Worker = &self.workers[0];

        for (self.workers) |*worker| {
            const avg = worker.avgResponseTime();
            if (avg < min_time) {
                min_time = avg;
                selected = worker;
            }
        }

        return selected;
    }

    fn selectRandom(self: *LoadBalancer) *Worker {
        var prng = std.rand.DefaultPrng.init(@intCast(std.time.milliTimestamp()));
        const idx = prng.random().intRangeAtMost(usize, 0, self.workers.len - 1);
        return &self.workers[idx];
    }
};

Real-world Case Studies

Case Study 1: Bun.js Server

Bun.jsは、Zigで実装された高速JavaScriptランタイムです。

アーキテクチャの特徴:

  • io_uringベースのイベントループ
  • Zero-copy HTTP parsing
  • Multi-threaded with worker pool
  • Optimized for real-world web workloads

性能数値:

Benchmark: "Hello World" HTTP server

Bun.js (Zig + io_uring):     250,000 req/s
Node.js (libuv):              90,000 req/s
Deno (Rust + Tokio):        120,000 req/s

Result: Bun is 2.7x faster than Node.js

Case Study 2: TigerBeetle Database

金融取引用の高性能データベース。

ネットワーク層の特徴:

  • Fixed connection poolwith pre-allocated buffers
  • Batched request processing
  • Zero-copy message passing
  • Deterministic performance

最適化テクニック:

// TigerBeetle-style batching
pub const MessageBatch = struct {
    messages: []Message,
    count: usize,

    const BATCH_SIZE = 256;

    pub fn processBatch(batch: *MessageBatch) !void {
        // Process all messages in a single system call
        for (batch.messages[0..batch.count]) |*msg| {
            try processMessage(msg);
        }
    }
};

まとめ

このExplanationでは、実践的なネットワークサーバー実装について詳しく学びました:

  • TCPサーバー: Connection pool、epollベースのイベントループ
  • HTTPサーバー: Request parsing、routing、statistics
  • ロードバランシング: 複数の戦略、worker selection
  • 実世界の事例: Bun.js、TigerBeetleの最適化

次のエクササイズでは、これらの知識を使って独自のサーバーを構築します。