Rust Elective 04: データベースエンジン - RDBMS内部構造の実装

課題説明

概要

リレーショナルデータベース管理システム(RDBMS)は、構造化データを効率的に保存・検索するための基盤技術です。本課題では、Rustを使ってシンプルなデータベースエンジンを実装し、ストレージエンジン、インデックス、クエリ処理、トランザクションの基礎を学びます。

背景と動機

データベースの重要性:

  • 永続化: データの安全な保存と復旧
  • 並行制御: 複数ユーザーの同時アクセス
  • ACID特性: 原子性、一貫性、独立性、耐久性
  • 効率的検索: インデックスによる高速クエリ

Rustの優位性:

  • メモリ安全性(データの破損防止)
  • ゼロコストの抽象化(高性能)
  • 並行処理に強い(マルチスレッド安全)
  • ファイルI/Oの安全性
  • 課題要件

    以下の機能を持つデータベースエンジンを実装してください:

  • ストレージエンジン:
- ページベースのストレージ - ヒープファイル管理 - レコードの挿入・削除・更新 - バッファプール管理

  • B+ツリーインデックス:
- キーによる高速検索 - 範囲検索 - ノード分割と結合 - ページ管理

  • クエリ処理:
- 簡易SQLパーサー(SELECT、INSERT、UPDATE、DELETE) - クエリ実行計画 - テーブルスキャン - インデックススキャン

  • トランザクション:
- ACID特性の基本実装 - Write-Ahead Logging (WAL) - ロールバック - チェックポイント

  • データ型とスキーマ:
- 基本型(INT、VARCHAR、BOOL) - テーブル定義 - スキーマ管理 - 制約(PRIMARY KEY、UNIQUE)

制約条件

  • ページサイズは4KBとすること
  • B+ツリーのオーダーは可変にすること
  • クエリは基本的なSQLに対応すること
  • トランザクションログを実装すること
  • エラーハンドリングを適切に実装すること

---

想定解答

プロジェクト構造

rust_db/
├── Cargo.toml
├── src/
│   ├── main.rs
│   ├── lib.rs
│   ├── storage/
│   │   ├── mod.rs
│   │   ├── page.rs
│   │   ├── heap_file.rs
│   │   └── buffer_pool.rs
│   ├── index/
│   │   ├── mod.rs
│   │   └── btree.rs
│   ├── query/
│   │   ├── mod.rs
│   │   ├── parser.rs
│   │   ├── executor.rs
│   │   └── planner.rs
│   ├── transaction/
│   │   ├── mod.rs
│   │   ├── log.rs
│   │   └── manager.rs
│   └── schema/
│       ├── mod.rs
│       ├── table.rs
│       └── types.rs
├── tests/
│   └── integration_tests.rs
└── README.md

Cargo.toml

[package]
name = "rust_db"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
thiserror = "1.0"
anyhow = "1.0"
nom = "7.1"
log = "0.4"
env_logger = "0.11"

[dev-dependencies]
tempfile = "3.8"

src/storage/page.rs

use serde::{Deserialize, Serialize};
use std::io::{self, Read, Write};

pub const PAGE_SIZE: usize = 4096;

/// ページID
pub type PageId = u32;

/// ページヘッダー
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageHeader {
    pub page_id: PageId,
    pub page_type: PageType,
    pub free_space_offset: u16,
    pub num_records: u16,
}

/// ページタイプ
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum PageType {
    Data,     // データページ
    Index,    // インデックスページ
    Overflow, // オーバーフローページ
}

/// ページ構造
#[derive(Debug, Clone)]
pub struct Page {
    pub header: PageHeader,
    pub data: Vec<u8>,
}

impl Page {
    /// 新しいページを作成
    pub fn new(page_id: PageId, page_type: PageType) -> Self {
        let header = PageHeader {
            page_id,
            page_type,
            free_space_offset: std::mem::size_of::<PageHeader>() as u16,
            num_records: 0,
        };

        let data = vec![0u8; PAGE_SIZE - std::mem::size_of::<PageHeader>()];

        Page { header, data }
    }

    /// レコードを挿入
    pub fn insert_record(&mut self, record: &[u8]) -> Result<u16, String> {
        if record.len() > self.free_space() {
            return Err("Not enough space in page".to_string());
        }

        let slot_id = self.header.num_records;
        let offset = self.header.free_space_offset as usize;

        // レコードデータをコピー
        self.data[offset..offset + record.len()].copy_from_slice(record);

        // ヘッダー更新
        self.header.free_space_offset += record.len() as u16;
        self.header.num_records += 1;

        Ok(slot_id)
    }

    /// レコードを取得
    pub fn get_record(&self, slot_id: u16) -> Option<Vec<u8>> {
        if slot_id >= self.header.num_records {
            return None;
        }

        // 簡易実装:固定長レコードと仮定
        let record_size = 128; // 仮の固定サイズ
        let offset = slot_id as usize * record_size;

        if offset + record_size > self.data.len() {
            return None;
        }

        Some(self.data[offset..offset + record_size].to_vec())
    }

    /// 空き容量を返す
    pub fn free_space(&self) -> usize {
        PAGE_SIZE - self.header.free_space_offset as usize
    }

    /// ページをバイト列にシリアライズ
    pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
        let mut bytes = Vec::with_capacity(PAGE_SIZE);

        // ヘッダーをシリアライズ
        let header_bytes = bincode::serialize(&self.header)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
        bytes.write_all(&header_bytes)?;

        // データを書き込み
        bytes.write_all(&self.data)?;

        // ページサイズにパディング
        bytes.resize(PAGE_SIZE, 0);

        Ok(bytes)
    }

    /// バイト列からページを復元
    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
        if bytes.len() != PAGE_SIZE {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "Invalid page size",
            ));
        }

        // ヘッダーサイズを推定
        let header_size = std::mem::size_of::<PageHeader>();

        // ヘッダーをデシリアライズ
        let header: PageHeader = bincode::deserialize(&bytes[..header_size])
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

        // データ部分を抽出
        let data = bytes[header_size..].to_vec();

        Ok(Page { header, data })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_page_creation() {
        let page = Page::new(1, PageType::Data);
        assert_eq!(page.header.page_id, 1);
        assert_eq!(page.header.page_type, PageType::Data);
        assert!(page.free_space() > 0);
    }

    #[test]
    fn test_record_insert() {
        let mut page = Page::new(1, PageType::Data);
        let record = b"Hello, Database!";

        let slot_id = page.insert_record(record).unwrap();
        assert_eq!(slot_id, 0);
        assert_eq!(page.header.num_records, 1);
    }

    #[test]
    fn test_serialization() {
        let page = Page::new(42, PageType::Data);
        let bytes = page.to_bytes().unwrap();
        let restored = Page::from_bytes(&bytes).unwrap();

        assert_eq!(page.header.page_id, restored.header.page_id);
        assert_eq!(page.header.page_type, restored.header.page_type);
    }
}

src/index/btree.rs

use std::cmp::Ordering;
use serde::{Deserialize, Serialize};

/// B+ツリーのノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BTreeNode {
    Internal(InternalNode),
    Leaf(LeafNode),
}

/// 内部ノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InternalNode {
    pub keys: Vec<i64>,
    pub children: Vec<u32>, // PageId
}

/// リーフノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeafNode {
    pub keys: Vec<i64>,
    pub values: Vec<Vec<u8>>,
    pub next: Option<u32>, // 次のリーフへのポインタ
}

/// B+ツリー
pub struct BTree {
    pub root: Option<u32>,
    pub order: usize, // 最大キー数
}

impl BTree {
    /// 新しいB+ツリーを作成
    pub fn new(order: usize) -> Self {
        BTree { root: None, order }
    }

    /// キーで検索
    pub fn search(&self, key: i64) -> Option<Vec<u8>> {
        // 簡易実装:実際にはページを読み込んで検索
        None
    }

    /// キーと値を挿入
    pub fn insert(&mut self, key: i64, value: Vec<u8>) -> Result<(), String> {
        // ルートが存在しない場合、新しいリーフノードを作成
        if self.root.is_none() {
            let leaf = LeafNode {
                keys: vec![key],
                values: vec![value],
                next: None,
            };
            // 実際にはページに書き込む
            self.root = Some(0);
            return Ok(());
        }

        // 実際の実装ではここで挿入とノード分割を行う
        Ok(())
    }

    /// 範囲検索
    pub fn range_search(&self, start: i64, end: i64) -> Vec<Vec<u8>> {
        // 実際にはリーフノードを順次走査
        Vec::new()
    }
}

impl LeafNode {
    /// キーを挿入(ソート済みを維持)
    pub fn insert(&mut self, key: i64, value: Vec<u8>) {
        let pos = self.keys.binary_search(&key).unwrap_or_else(|e| e);
        self.keys.insert(pos, key);
        self.values.insert(pos, value);
    }

    /// ノードが満杯か
    pub fn is_full(&self, order: usize) -> bool {
        self.keys.len() >= order
    }

    /// ノードを分割
    pub fn split(&mut self) -> LeafNode {
        let mid = self.keys.len() / 2;

        let right_keys = self.keys.split_off(mid);
        let right_values = self.values.split_off(mid);

        LeafNode {
            keys: right_keys,
            values: right_values,
            next: self.next,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_leaf_node_insert() {
        let mut leaf = LeafNode {
            keys: vec![],
            values: vec![],
            next: None,
        };

        leaf.insert(10, b"ten".to_vec());
        leaf.insert(5, b"five".to_vec());
        leaf.insert(15, b"fifteen".to_vec());

        // キーがソートされている
        assert_eq!(leaf.keys, vec![5, 10, 15]);
    }

    #[test]
    fn test_leaf_node_split() {
        let mut leaf = LeafNode {
            keys: vec![1, 2, 3, 4],
            values: vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec(), b"4".to_vec()],
            next: None,
        };

        let right = leaf.split();

        assert_eq!(leaf.keys, vec![1, 2]);
        assert_eq!(right.keys, vec![3, 4]);
    }
}

src/query/parser.rs

use nom::{
    IResult,
    bytes::complete::{tag, take_while1, take_until},
    character::complete::{alpha1, alphanumeric1, char, multispace0, multispace1},
    combinator::{map, opt},
    multi::separated_list0,
    sequence::{delimited, preceded, tuple},
    branch::alt,
};

/// SQLクエリの種類
#[derive(Debug, Clone, PartialEq)]
pub enum Query {
    Select(SelectQuery),
    Insert(InsertQuery),
    Update(UpdateQuery),
    Delete(DeleteQuery),
    CreateTable(CreateTableQuery),
}

#[derive(Debug, Clone, PartialEq)]
pub struct SelectQuery {
    pub columns: Vec<String>,
    pub table: String,
    pub where_clause: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct InsertQuery {
    pub table: String,
    pub columns: Vec<String>,
    pub values: Vec<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct UpdateQuery {
    pub table: String,
    pub set_clause: Vec<(String, String)>,
    pub where_clause: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct DeleteQuery {
    pub table: String,
    pub where_clause: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct CreateTableQuery {
    pub table: String,
    pub columns: Vec<(String, String)>, // (name, type)
}

/// SQLクエリをパース
pub fn parse_query(input: &str) -> Result<Query, String> {
    let input = input.trim();

    if input.to_uppercase().starts_with("SELECT") {
        parse_select(input)
            .map(|(_, q)| Query::Select(q))
            .map_err(|e| format!("Parse error: {:?}", e))
    } else if input.to_uppercase().starts_with("INSERT") {
        parse_insert(input)
            .map(|(_, q)| Query::Insert(q))
            .map_err(|e| format!("Parse error: {:?}", e))
    } else if input.to_uppercase().starts_with("CREATE TABLE") {
        parse_create_table(input)
            .map(|(_, q)| Query::CreateTable(q))
            .map_err(|e| format!("Parse error: {:?}", e))
    } else {
        Err("Unsupported query type".to_string())
    }
}

/// SELECT文のパース
fn parse_select(input: &str) -> IResult<&str, SelectQuery> {
    let (input, _) = tag("SELECT")(input)?;
    let (input, _) = multispace1(input)?;

    // カラムリスト
    let (input, columns) = alt((
        map(tag("*"), |_| vec!["*".to_string()]),
        separated_list0(
            delimited(multispace0, char(','), multispace0),
            map(alphanumeric1, |s: &str| s.to_string()),
        ),
    ))(input)?;

    let (input, _) = multispace1(input)?;
    let (input, _) = tag("FROM")(input)?;
    let (input, _) = multispace1(input)?;

    // テーブル名
    let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;

    // WHERE句(オプション)
    let (input, where_clause) = opt(preceded(
        tuple((multispace1, tag("WHERE"), multispace1)),
        map(take_until(";"), |s: &str| s.to_string()),
    ))(input)?;

    Ok((
        input,
        SelectQuery {
            columns,
            table,
            where_clause,
        },
    ))
}

/// INSERT文のパース
fn parse_insert(input: &str) -> IResult<&str, InsertQuery> {
    let (input, _) = tag("INSERT INTO")(input)?;
    let (input, _) = multispace1(input)?;

    // テーブル名
    let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;

    let (input, _) = multispace0(input)?;

    // カラムリスト
    let (input, columns) = delimited(
        char('('),
        separated_list0(
            delimited(multispace0, char(','), multispace0),
            map(alphanumeric1, |s: &str| s.to_string()),
        ),
        char(')'),
    )(input)?;

    let (input, _) = multispace1(input)?;
    let (input, _) = tag("VALUES")(input)?;
    let (input, _) = multispace0(input)?;

    // 値リスト
    let (input, values) = delimited(
        char('('),
        separated_list0(
            delimited(multispace0, char(','), multispace0),
            map(
                alt((
                    delimited(char('\''), take_until("'"), char('\'')),
                    alphanumeric1,
                )),
                |s: &str| s.to_string(),
            ),
        ),
        char(')'),
    )(input)?;

    Ok((
        input,
        InsertQuery {
            table,
            columns,
            values,
        },
    ))
}

/// CREATE TABLE文のパース
fn parse_create_table(input: &str) -> IResult<&str, CreateTableQuery> {
    let (input, _) = tag("CREATE TABLE")(input)?;
    let (input, _) = multispace1(input)?;

    // テーブル名
    let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;

    let (input, _) = multispace0(input)?;

    // カラム定義
    let (input, columns) = delimited(
        char('('),
        separated_list0(
            delimited(multispace0, char(','), multispace0),
            map(
                tuple((
                    alphanumeric1,
                    multispace1,
                    alphanumeric1,
                )),
                |(name, _, typ): (&str, &str, &str)| {
                    (name.to_string(), typ.to_string())
                },
            ),
        ),
        char(')'),
    )(input)?;

    Ok((input, CreateTableQuery { table, columns }))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_select() {
        let sql = "SELECT * FROM users WHERE id = 1;";
        let result = parse_query(sql).unwrap();

        if let Query::Select(q) = result {
            assert_eq!(q.table, "users");
            assert_eq!(q.columns, vec!["*"]);
            assert!(q.where_clause.is_some());
        } else {
            panic!("Expected Select query");
        }
    }

    #[test]
    fn test_parse_insert() {
        let sql = "INSERT INTO users (id, name) VALUES (1, 'Alice');";
        let result = parse_query(sql).unwrap();

        if let Query::Insert(q) = result {
            assert_eq!(q.table, "users");
            assert_eq!(q.columns, vec!["id", "name"]);
            assert_eq!(q.values, vec!["1", "Alice"]);
        } else {
            panic!("Expected Insert query");
        }
    }

    #[test]
    fn test_parse_create_table() {
        let sql = "CREATE TABLE users (id INT, name VARCHAR);";
        let result = parse_query(sql).unwrap();

        if let Query::CreateTable(q) = result {
            assert_eq!(q.table, "users");
            assert_eq!(q.columns.len(), 2);
        } else {
            panic!("Expected CreateTable query");
        }
    }
}

src/main.rs

use rust_db::{
    query::parser::parse_query,
    storage::page::{Page, PageType},
};
use std::io::{self, Write};

fn main() {
    println!("=== Rust Database Engine ===");
    println!("Enter SQL queries (type 'exit' to quit):\n");

    let mut buffer = String::new();

    loop {
        print!("db> ");
        io::stdout().flush().unwrap();

        buffer.clear();
        io::stdin().read_line(&mut buffer).unwrap();

        let input = buffer.trim();

        if input.eq_ignore_ascii_case("exit") {
            println!("Goodbye!");
            break;
        }

        if input.is_empty() {
            continue;
        }

        match parse_query(input) {
            Ok(query) => {
                println!("Parsed: {:?}", query);
                // ここでクエリを実行
                execute_query(query);
            }
            Err(e) => {
                println!("Error: {}", e);
            }
        }
    }
}

fn execute_query(query: rust_db::query::parser::Query) {
    use rust_db::query::parser::Query;

    match query {
        Query::Select(q) => {
            println!("Executing SELECT on table '{}'", q.table);
            // 実際の実装ではテーブルスキャン
        }
        Query::Insert(q) => {
            println!("Executing INSERT into table '{}'", q.table);
            // 実際の実装ではレコード挿入
        }
        Query::CreateTable(q) => {
            println!("Creating table '{}'", q.table);
            // 実際の実装ではスキーマ作成
        }
        _ => {
            println!("Query execution not implemented");
        }
    }
}

---

解説

実装のポイント

1. ページベースストレージ

固定サイズページ:

pub const PAGE_SIZE: usize = 4096;

pub struct Page {
    pub header: PageHeader,
    pub data: Vec<u8>,
}

  • OSのページサイズに合わせて4KB
  • ヘッダーとデータ領域を分離
  • ディスクI/Oの単位

2. B+ツリーインデックス

リーフノードの連結:

pub struct LeafNode {
    pub keys: Vec<i64>,
    pub values: Vec<Vec<u8>>,
    pub next: Option<u32>, // 次のリーフ
}

  • 範囲検索が効率的
  • リーフノード間を順次走査
  • O(log N)の検索時間

3. SQLパーサー

nom によるパース:

fn parse_select(input: &str) -> IResult<&str, SelectQuery> {
    let (input, _) = tag("SELECT")(input)?;
    let (input, columns) = separated_list0(...)(input)?;
    // ...
}

  • パーサコンビネータで構文解析
  • 型安全なパース
  • エラー回復可能

4. トランザクションログ

Write-Ahead Logging:

// 変更前にログに書き込み
log.write_entry(LogEntry::Update { ... });
log.flush(); // ディスクに同期

// 実際のデータを変更
page.update_record(...);

  • 障害からの復旧を保証
  • ACID特性のD(耐久性)を実現
  • リカバリ時にログをリプレイ
  • 設計判断

  • ページサイズ4KB: OSのページサイズに合わせ、ディスクI/Oを最適化
  • B+ツリー: 範囲検索に強く、リーフが連結されている
  • 固定長レコード: 実装をシンプルに(実際は可変長も必要)
  • シンプルなSQL: SELECT、INSERT、CREATE TABLEに限定
  • 代替案

  • LSMツリー: 書き込み性能を重視(RocksDB等)
  • 可変長レコード: スロット配列で管理
  • MVCC: マルチバージョン並行制御
  • クエリ最適化: コストベースオプティマイザ
  • ---

    学習の意図

    習得する概念

  • ストレージ:
- ページ管理 - バッファプール - ディスクI/O - 永続化

  • インデックス:
- B+ツリー - ハッシュインデックス - 検索アルゴリズム - 空間計算量

  • クエリ処理:
- 構文解析 - 実行計画 - 最適化 - 演算子

  • トランザクション:
- ACID特性 - ログベースリカバリ - 並行制御 - ロック

CSの基礎との関連

データ構造:

  • B+ツリー(多分木)
  • ハッシュテーブル
  • ヒープファイル
  • リングバッファ

アルゴリズム:

  • 二分探索
  • ソート
  • ページ置換(LRU)
  • ロックプロトコル

OS:

  • ファイルシステム
  • ページング
  • バッファ管理
  • 並行処理

分散システム:

  • CAP定理
  • 2相コミット
  • レプリケーション
  • シャーディング
  • ---

    テスト方法

    単体テスト

    cargo test
    

    統合テスト

    # データベースREPLを起動
    cargo run
    
    # テストクエリ
    db> CREATE TABLE users (id INT, name VARCHAR);
    db> INSERT INTO users (id, name) VALUES (1, 'Alice');
    db> SELECT * FROM users;
    db> exit
    

    ベンチマーク

    # 大量INSERT
    time cargo run -- benchmark insert 10000
    
    # 大量SELECT
    time cargo run -- benchmark select 10000
    

    ---

    評価基準

    必須要件(60点)

  • [ ] ページベースストレージが動作する(15点)
  • [ ] レコードの挿入・取得ができる(10点)
  • [ ] B+ツリーの基本実装(15点)
  • [ ] SQLパーサーが動作する(10点)
  • [ ] トランザクションログの基本実装(10点)
  • 標準要件(30点)

  • [ ] バッファプール管理(10点)
  • [ ] 範囲検索の実装(5点)
  • [ ] UPDATE/DELETE文のサポート(10点)
  • [ ] ロールバック機能(5点)
  • 発展要件(10点)

  • [ ] インデックススキャン最適化(3点)
  • [ ] JOIN操作の実装(4点)
  • [ ] MVCC(マルチバージョン並行制御)(3点)
  • ボーナス(+10点)

  • [ ] クエリオプティマイザ(+5点)
  • [ ] ネットワークプロトコル(PostgreSQLプロトコル)(+5点)
  • ---

    参考資料

    データベース基礎

  • Database System Concepts (Silberschatz)
  • CMU 15-445 Database Systems
  • Architecture of a Database System
  • 実装参考

  • SQLite Architecture
  • PostgreSQL Internals
  • TiKV (Rust distributed database)