Explanation 04: Network Server Design
完全なサーバー実装
このセクションでは、本格的なネットワークサーバーを段階的に構築していきます。
Phase 1: 基本的なTCPサーバー
Echo Server with Connection Pool
const std = @import("std");
const net = std.net;
const os = std.os;
const linux = os.linux;
pub const EchoServer = struct {
allocator: std.mem.Allocator,
address: net.Address,
listener: net.Server,
pool: ConnectionPool,
shutdown: GracefulShutdown,
epoll_fd: i32,
const Self = @This();
pub fn init(
allocator: std.mem.Allocator,
port: u16,
max_connections: usize,
) !Self {
const address = try net.Address.parseIp("0.0.0.0", port);
const listener = try address.listen(.{
.reuse_address = true,
.reuse_port = true,
});
// Create epoll instance
const epoll_fd = try os.epoll_create1(linux.EPOLL.CLOEXEC);
errdefer os.close(epoll_fd);
// Add listener to epoll
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{ .fd = listener.stream.handle },
};
try os.epoll_ctl(
epoll_fd,
linux.EPOLL.CTL_ADD,
listener.stream.handle,
&event,
);
return Self{
.allocator = allocator,
.address = address,
.listener = listener,
.pool = try ConnectionPool.init(allocator, max_connections),
.shutdown = GracefulShutdown.init(30000), // 30s timeout
.epoll_fd = epoll_fd,
};
}
pub fn run(self: *Self) !void {
std.log.info("Echo server listening on {}", .{self.address});
var events: [128]linux.epoll_event = undefined;
while (self.shutdown.shouldAcceptConnections()) {
const n = os.epoll_wait(self.epoll_fd, &events, -1);
for (events[0..@intCast(n)]) |event| {
if (event.data.fd == self.listener.stream.handle) {
// Accept new connection
self.acceptConnection() catch |err| {
std.log.err("Failed to accept connection: {}", .{err});
};
} else {
// Handle existing connection
self.handleConnection(event.data.fd) catch |err| {
std.log.err("Error handling connection: {}", .{err});
self.closeConnection(event.data.fd);
};
}
}
// Timeout check every iteration
self.pool.timeoutCheck(60000); // 60s timeout
}
// Graceful shutdown
try self.shutdown.wait();
std.log.info("Server shutdown complete", .{});
}
fn acceptConnection(self: *Self) !void {
if (!self.shutdown.shouldAcceptConnections()) {
return;
}
const conn = try self.listener.accept();
// Configure socket
try setNonBlocking(conn.stream.handle);
try setNoDelay(conn.stream.handle);
// Acquire connection from pool
const pool_conn = self.pool.acquire() orelse {
std.log.warn("Connection pool full, rejecting connection", .{});
conn.stream.close();
return;
};
pool_conn.fd = conn.stream.handle;
pool_conn.state = .reading;
pool_conn.read_offset = 0;
pool_conn.write_offset = 0;
pool_conn.last_activity = std.time.milliTimestamp();
// Add to epoll
var event = linux.epoll_event{
.events = linux.EPOLL.IN | linux.EPOLL.ET, // Edge-triggered
.data = .{ .fd = conn.stream.handle },
};
try os.epoll_ctl(
self.epoll_fd,
linux.EPOLL.CTL_ADD,
conn.stream.handle,
&event,
);
self.shutdown.onConnectionAccepted();
std.log.info("New connection accepted: fd={}", .{conn.stream.handle});
}
fn handleConnection(self: *Self, fd: i32) !void {
const conn = self.pool.findByFd(fd) orelse return error.ConnectionNotFound;
conn.last_activity = std.time.milliTimestamp();
switch (conn.state) {
.reading => try self.handleRead(conn),
.writing => try self.handleWrite(conn),
else => {},
}
}
fn handleRead(self: *Self, conn: *ConnectionPool.Connection) !void {
while (true) {
const n = os.read(
conn.fd,
conn.buffer[conn.read_offset..],
) catch |err| {
if (err == error.WouldBlock) return;
return err;
};
if (n == 0) {
// Connection closed by peer
self.closeConnection(conn.fd);
return;
}
conn.read_offset += n;
// Echo: copy to write buffer
conn.write_offset = conn.read_offset;
conn.read_offset = 0;
conn.state = .writing;
// Modify epoll to watch for EPOLLOUT
var event = linux.epoll_event{
.events = linux.EPOLL.OUT | linux.EPOLL.ET,
.data = .{ .fd = conn.fd },
};
try os.epoll_ctl(
self.epoll_fd,
linux.EPOLL.CTL_MOD,
conn.fd,
&event,
);
return; // Let epoll notify us when ready to write
}
}
fn handleWrite(self: *Self, conn: *ConnectionPool.Connection) !void {
var written: usize = 0;
while (written < conn.write_offset) {
const n = os.write(
conn.fd,
conn.buffer[written..conn.write_offset],
) catch |err| {
if (err == error.WouldBlock) {
// Shift remaining data to start of buffer
std.mem.copyForwards(
u8,
conn.buffer[0..],
conn.buffer[written..conn.write_offset],
);
conn.write_offset -= written;
return;
}
return err;
};
written += n;
}
// All data written, switch back to reading
conn.write_offset = 0;
conn.state = .reading;
var event = linux.epoll_event{
.events = linux.EPOLL.IN | linux.EPOLL.ET,
.data = .{ .fd = conn.fd },
};
try os.epoll_ctl(
self.epoll_fd,
linux.EPOLL.CTL_MOD,
conn.fd,
&event,
);
}
fn closeConnection(self: *Self, fd: i32) void {
if (self.pool.findByFd(fd)) |conn| {
os.close(fd);
self.pool.release(conn);
self.shutdown.onConnectionClosed();
std.log.info("Connection closed: fd={}", .{fd});
}
}
pub fn deinit(self: *Self) void {
os.close(self.epoll_fd);
self.listener.deinit();
self.pool.deinit();
}
};
fn setNonBlocking(fd: i32) !void {
const flags = try os.fcntl(fd, os.F.GETFL, 0);
_ = try os.fcntl(fd, os.F.SETFL, flags | @as(u32, os.O.NONBLOCK));
}
fn setNoDelay(fd: i32) !void {
try os.setsockopt(
fd,
os.IPPROTO.TCP,
os.TCP.NODELAY,
&std.mem.toBytes(@as(c_int, 1)),
);
}
Connection Pool実装
pub const ConnectionPool = struct {
connections: []Connection,
free_list: std.ArrayList(usize),
used_map: std.AutoHashMap(i32, usize),
mutex: std.Thread.Mutex,
buffer_allocator: std.mem.Allocator,
pub const Connection = struct {
fd: i32,
state: State,
buffer: []u8,
read_offset: usize,
write_offset: usize,
last_activity: i64,
pub const State = enum {
idle,
reading,
processing,
writing,
closing,
};
};
pub fn init(allocator: std.mem.Allocator, capacity: usize) !ConnectionPool {
var connections = try allocator.alloc(Connection, capacity);
errdefer allocator.free(connections);
var free_list = try std.ArrayList(usize).initCapacity(allocator, capacity);
errdefer free_list.deinit();
var used_map = std.AutoHashMap(i32, usize).init(allocator);
errdefer used_map.deinit();
// Pre-allocate buffers
for (connections, 0..) |*conn, i| {
conn.fd = -1;
conn.state = .idle;
conn.buffer = try allocator.alloc(u8, 8192);
conn.read_offset = 0;
conn.write_offset = 0;
conn.last_activity = 0;
try free_list.append(i);
}
return ConnectionPool{
.connections = connections,
.free_list = free_list,
.used_map = used_map,
.mutex = std.Thread.Mutex{},
.buffer_allocator = allocator,
};
}
pub fn acquire(self: *ConnectionPool) ?*Connection {
self.mutex.lock();
defer self.mutex.unlock();
if (self.free_list.popOrNull()) |idx| {
const conn = &self.connections[idx];
return conn;
}
return null;
}
pub fn release(self: *ConnectionPool, conn: *Connection) void {
self.mutex.lock();
defer self.mutex.unlock();
_ = self.used_map.remove(conn.fd);
const idx = (@intFromPtr(conn) - @intFromPtr(self.connections.ptr)) /
@sizeOf(Connection);
conn.fd = -1;
conn.state = .idle;
conn.read_offset = 0;
conn.write_offset = 0;
self.free_list.append(idx) catch {};
}
pub fn findByFd(self: *ConnectionPool, fd: i32) ?*Connection {
self.mutex.lock();
defer self.mutex.unlock();
if (self.used_map.get(fd)) |idx| {
return &self.connections[idx];
}
// Slow path: linear search (for newly accepted connections)
for (self.connections) |*conn| {
if (conn.fd == fd) {
const idx = (@intFromPtr(conn) - @intFromPtr(self.connections.ptr)) /
@sizeOf(Connection);
self.used_map.put(fd, idx) catch {};
return conn;
}
}
return null;
}
pub fn timeoutCheck(self: *ConnectionPool, timeout_ms: i64) void {
const now = std.time.milliTimestamp();
for (self.connections) |*conn| {
if (conn.state != .idle and
now - conn.last_activity > timeout_ms)
{
std.log.warn("Connection timeout: fd={}", .{conn.fd});
self.release(conn);
}
}
}
pub fn deinit(self: *ConnectionPool) void {
for (self.connections) |*conn| {
self.buffer_allocator.free(conn.buffer);
}
self.buffer_allocator.free(self.connections);
self.free_list.deinit();
self.used_map.deinit();
}
};
Phase 2: HTTP Server実装
基本的なHTTPパーサー
pub const HttpRequest = struct {
method: Method,
path: []const u8,
version: []const u8,
headers: std.StringHashMap([]const u8),
body: []const u8,
pub const Method = enum {
GET,
POST,
PUT,
DELETE,
HEAD,
OPTIONS,
PATCH,
pub fn parse(s: []const u8) ?Method {
if (std.mem.eql(u8, s, "GET")) return .GET;
if (std.mem.eql(u8, s, "POST")) return .POST;
if (std.mem.eql(u8, s, "PUT")) return .PUT;
if (std.mem.eql(u8, s, "DELETE")) return .DELETE;
if (std.mem.eql(u8, s, "HEAD")) return .HEAD;
if (std.mem.eql(u8, s, "OPTIONS")) return .OPTIONS;
if (std.mem.eql(u8, s, "PATCH")) return .PATCH;
return null;
}
};
pub fn parse(
allocator: std.mem.Allocator,
buffer: []const u8,
) !HttpRequest {
var headers = std.StringHashMap([]const u8).init(allocator);
errdefer headers.deinit();
var lines = std.mem.splitSequence(u8, buffer, "\r\n");
// Parse request line
const request_line = lines.next() orelse return error.InvalidRequest;
var parts = std.mem.splitScalar(u8, request_line, ' ');
const method_str = parts.next() orelse return error.InvalidMethod;
const path = parts.next() orelse return error.InvalidPath;
const version = parts.next() orelse return error.InvalidVersion;
const method = Method.parse(method_str) orelse return error.UnknownMethod;
// Parse headers
while (lines.next()) |line| {
if (line.len == 0) break; // End of headers
const colon = std.mem.indexOf(u8, line, ":") orelse continue;
const key = std.mem.trim(u8, line[0..colon], " \t");
const value = std.mem.trim(u8, line[colon + 1 ..], " \t");
try headers.put(key, value);
}
// Remaining is body
const body = lines.rest();
return HttpRequest{
.method = method,
.path = path,
.version = version,
.headers = headers,
.body = body,
};
}
pub fn deinit(self: *HttpRequest) void {
self.headers.deinit();
}
};
pub const HttpResponse = struct {
status: u16,
status_text: []const u8,
headers: std.StringHashMap([]const u8),
body: []const u8,
pub fn format(
self: HttpResponse,
allocator: std.mem.Allocator,
) ![]u8 {
var list = std.ArrayList(u8).init(allocator);
errdefer list.deinit();
// Status line
try list.writer().print(
"HTTP/1.1 {} {s}\r\n",
.{ self.status, self.status_text },
);
// Headers
var iter = self.headers.iterator();
while (iter.next()) |entry| {
try list.writer().print(
"{s}: {s}\r\n",
.{ entry.key_ptr.*, entry.value_ptr.* },
);
}
// Content-Length (if not already set)
if (!self.headers.contains("Content-Length")) {
try list.writer().print(
"Content-Length: {}\r\n",
.{self.body.len},
);
}
// End of headers
try list.appendSlice("\r\n");
// Body
try list.appendSlice(self.body);
return list.toOwnedSlice();
}
};
HTTP Server with Router
pub const HttpServer = struct {
allocator: std.mem.Allocator,
echo_server: EchoServer,
router: Router,
static_dir: []const u8,
stats: Statistics,
const Self = @This();
pub const Router = struct {
routes: std.StringHashMap(HandlerFn),
pub const HandlerFn = *const fn (
*HttpServer,
*HttpRequest,
) anyerror!HttpResponse;
pub fn init(allocator: std.mem.Allocator) Router {
return .{
.routes = std.StringHashMap(HandlerFn).init(allocator),
};
}
pub fn addRoute(
self: *Router,
path: []const u8,
handler: HandlerFn,
) !void {
try self.routes.put(path, handler);
}
pub fn match(self: *Router, path: []const u8) ?HandlerFn {
return self.routes.get(path);
}
pub fn deinit(self: *Router) void {
self.routes.deinit();
}
};
pub const Statistics = struct {
requests_total: std.atomic.Value(u64),
requests_ok: std.atomic.Value(u64),
requests_error: std.atomic.Value(u64),
bytes_received: std.atomic.Value(u64),
bytes_sent: std.atomic.Value(u64),
start_time: i64,
pub fn init() Statistics {
return .{
.requests_total = std.atomic.Value(u64).init(0),
.requests_ok = std.atomic.Value(u64).init(0),
.requests_error = std.atomic.Value(u64).init(0),
.bytes_received = std.atomic.Value(u64).init(0),
.bytes_sent = std.atomic.Value(u64).init(0),
.start_time = std.time.milliTimestamp(),
};
}
pub fn recordRequest(self: *Statistics, ok: bool) void {
_ = self.requests_total.fetchAdd(1, .monotonic);
if (ok) {
_ = self.requests_ok.fetchAdd(1, .monotonic);
} else {
_ = self.requests_error.fetchAdd(1, .monotonic);
}
}
};
pub fn init(
allocator: std.mem.Allocator,
port: u16,
static_dir: []const u8,
) !Self {
var echo_server = try EchoServer.init(allocator, port, 10000);
var router = Router.init(allocator);
// Register routes
try router.addRoute("/", handleIndex);
try router.addRoute("/stats", handleStats);
return Self{
.allocator = allocator,
.echo_server = echo_server,
.router = router,
.static_dir = static_dir,
.stats = Statistics.init(),
};
}
fn handleIndex(
self: *HttpServer,
request: *HttpRequest,
) !HttpResponse {
_ = request;
_ = self;
const body =
\\<!DOCTYPE html>
\\<html>
\\<head><title>Zig HTTP Server</title></head>
\\<body>
\\ <h1>Welcome to Zig HTTP Server</h1>
\\ <p>Built with io_uring and love!</p>
\\ <a href="/stats">Statistics</a>
\\</body>
\\</html>
;
var headers = std.StringHashMap([]const u8).init(self.allocator);
try headers.put("Content-Type", "text/html; charset=utf-8");
return HttpResponse{
.status = 200,
.status_text = "OK",
.headers = headers,
.body = body,
};
}
fn handleStats(
self: *HttpServer,
request: *HttpRequest,
) !HttpResponse {
_ = request;
const total = self.stats.requests_total.load(.monotonic);
const ok = self.stats.requests_ok.load(.monotonic);
const err = self.stats.requests_error.load(.monotonic);
const rx = self.stats.bytes_received.load(.monotonic);
const tx = self.stats.bytes_sent.load(.monotonic);
const uptime = std.time.milliTimestamp() - self.stats.start_time;
const body = try std.fmt.allocPrint(
self.allocator,
\\{{
\\ "uptime_ms": {},
\\ "requests": {{
\\ "total": {},
\\ "ok": {},
\\ "error": {}
\\ }},
\\ "traffic": {{
\\ "bytes_received": {},
\\ "bytes_sent": {}
\\ }}
\\}}
,
.{ uptime, total, ok, err, rx, tx },
);
var headers = std.StringHashMap([]const u8).init(self.allocator);
try headers.put("Content-Type", "application/json");
return HttpResponse{
.status = 200,
.status_text = "OK",
.headers = headers,
.body = body,
};
}
pub fn deinit(self: *Self) void {
self.router.deinit();
self.echo_server.deinit();
}
};
Phase 3: Load Balancing
Round-Robin Load Balancer
pub const LoadBalancer = struct {
workers: []Worker,
next: std.atomic.Value(usize),
strategy: Strategy,
pub const Strategy = enum {
round_robin,
least_connections,
least_response_time,
random,
};
pub const Worker = struct {
id: usize,
queue: ConnectionQueue,
active_connections: std.atomic.Value(usize),
total_response_time_ms: std.atomic.Value(u64),
total_requests: std.atomic.Value(u64),
pub fn avgResponseTime(self: *Worker) f64 {
const total_time = self.total_response_time_ms.load(.monotonic);
const total_reqs = self.total_requests.load(.monotonic);
if (total_reqs == 0) return 0.0;
return @as(f64, @floatFromInt(total_time)) /
@as(f64, @floatFromInt(total_reqs));
}
};
pub fn init(
allocator: std.mem.Allocator,
num_workers: usize,
strategy: Strategy,
) !LoadBalancer {
var workers = try allocator.alloc(Worker, num_workers);
for (workers, 0..) |*worker, i| {
worker.* = .{
.id = i,
.queue = try ConnectionQueue.init(allocator),
.active_connections = std.atomic.Value(usize).init(0),
.total_response_time_ms = std.atomic.Value(u64).init(0),
.total_requests = std.atomic.Value(u64).init(0),
};
}
return LoadBalancer{
.workers = workers,
.next = std.atomic.Value(usize).init(0),
.strategy = strategy,
};
}
pub fn dispatch(self: *LoadBalancer, conn: Connection) !void {
const worker = switch (self.strategy) {
.round_robin => self.selectRoundRobin(),
.least_connections => self.selectLeastConnections(),
.least_response_time => self.selectLeastResponseTime(),
.random => self.selectRandom(),
};
try worker.queue.push(conn);
_ = worker.active_connections.fetchAdd(1, .monotonic);
}
fn selectRoundRobin(self: *LoadBalancer) *Worker {
const idx = self.next.fetchAdd(1, .monotonic) % self.workers.len;
return &self.workers[idx];
}
fn selectLeastConnections(self: *LoadBalancer) *Worker {
var min_conn: usize = std.math.maxInt(usize);
var selected: *Worker = &self.workers[0];
for (self.workers) |*worker| {
const conns = worker.active_connections.load(.monotonic);
if (conns < min_conn) {
min_conn = conns;
selected = worker;
}
}
return selected;
}
fn selectLeastResponseTime(self: *LoadBalancer) *Worker {
var min_time: f64 = std.math.floatMax(f64);
var selected: *Worker = &self.workers[0];
for (self.workers) |*worker| {
const avg = worker.avgResponseTime();
if (avg < min_time) {
min_time = avg;
selected = worker;
}
}
return selected;
}
fn selectRandom(self: *LoadBalancer) *Worker {
var prng = std.rand.DefaultPrng.init(@intCast(std.time.milliTimestamp()));
const idx = prng.random().intRangeAtMost(usize, 0, self.workers.len - 1);
return &self.workers[idx];
}
};
Real-world Case Studies
Case Study 1: Bun.js Server
Bun.jsは、Zigで実装された高速JavaScriptランタイムです。
アーキテクチャの特徴:
- io_uringベースのイベントループ
- Zero-copy HTTP parsing
- Multi-threaded with worker pool
- Optimized for real-world web workloads
性能数値:
Benchmark: "Hello World" HTTP server
Bun.js (Zig + io_uring): 250,000 req/s
Node.js (libuv): 90,000 req/s
Deno (Rust + Tokio): 120,000 req/s
Result: Bun is 2.7x faster than Node.js
Case Study 2: TigerBeetle Database
金融取引用の高性能データベース。
ネットワーク層の特徴:
- Fixed connection poolwith pre-allocated buffers
- Batched request processing
- Zero-copy message passing
- Deterministic performance
最適化テクニック:
// TigerBeetle-style batching
pub const MessageBatch = struct {
messages: []Message,
count: usize,
const BATCH_SIZE = 256;
pub fn processBatch(batch: *MessageBatch) !void {
// Process all messages in a single system call
for (batch.messages[0..batch.count]) |*msg| {
try processMessage(msg);
}
}
};
まとめ
このExplanationでは、実践的なネットワークサーバー実装について詳しく学びました:
- TCPサーバー: Connection pool、epollベースのイベントループ
- HTTPサーバー: Request parsing、routing、statistics
- ロードバランシング: 複数の戦略、worker selection
- 実世界の事例: Bun.js、TigerBeetleの最適化
次のエクササイズでは、これらの知識を使って独自のサーバーを構築します。