Rust Elective 04: データベースエンジン - RDBMS内部構造の実装
課題説明
概要
リレーショナルデータベース管理システム(RDBMS)は、構造化データを効率的に保存・検索するための基盤技術です。本課題では、Rustを使ってシンプルなデータベースエンジンを実装し、ストレージエンジン、インデックス、クエリ処理、トランザクションの基礎を学びます。
背景と動機
データベースの重要性:
- 永続化: データの安全な保存と復旧
- 並行制御: 複数ユーザーの同時アクセス
- ACID特性: 原子性、一貫性、独立性、耐久性
- 効率的検索: インデックスによる高速クエリ
Rustの優位性:
- メモリ安全性(データの破損防止)
- ゼロコストの抽象化(高性能)
- 並行処理に強い(マルチスレッド安全)
- ファイルI/Oの安全性
- ストレージエンジン:
課題要件
以下の機能を持つデータベースエンジンを実装してください:
- B+ツリーインデックス:
- クエリ処理:
- トランザクション:
- データ型とスキーマ:
制約条件
- ページサイズは4KBとすること
- B+ツリーのオーダーは可変にすること
- クエリは基本的なSQLに対応すること
- トランザクションログを実装すること
- エラーハンドリングを適切に実装すること
---
想定解答
プロジェクト構造
rust_db/
├── Cargo.toml
├── src/
│ ├── main.rs
│ ├── lib.rs
│ ├── storage/
│ │ ├── mod.rs
│ │ ├── page.rs
│ │ ├── heap_file.rs
│ │ └── buffer_pool.rs
│ ├── index/
│ │ ├── mod.rs
│ │ └── btree.rs
│ ├── query/
│ │ ├── mod.rs
│ │ ├── parser.rs
│ │ ├── executor.rs
│ │ └── planner.rs
│ ├── transaction/
│ │ ├── mod.rs
│ │ ├── log.rs
│ │ └── manager.rs
│ └── schema/
│ ├── mod.rs
│ ├── table.rs
│ └── types.rs
├── tests/
│ └── integration_tests.rs
└── README.md
Cargo.toml
[package]
name = "rust_db"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
thiserror = "1.0"
anyhow = "1.0"
nom = "7.1"
log = "0.4"
env_logger = "0.11"
[dev-dependencies]
tempfile = "3.8"
src/storage/page.rs
use serde::{Deserialize, Serialize};
use std::io::{self, Read, Write};
pub const PAGE_SIZE: usize = 4096;
/// ページID
pub type PageId = u32;
/// ページヘッダー
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageHeader {
pub page_id: PageId,
pub page_type: PageType,
pub free_space_offset: u16,
pub num_records: u16,
}
/// ページタイプ
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum PageType {
Data, // データページ
Index, // インデックスページ
Overflow, // オーバーフローページ
}
/// ページ構造
#[derive(Debug, Clone)]
pub struct Page {
pub header: PageHeader,
pub data: Vec<u8>,
}
impl Page {
/// 新しいページを作成
pub fn new(page_id: PageId, page_type: PageType) -> Self {
let header = PageHeader {
page_id,
page_type,
free_space_offset: std::mem::size_of::<PageHeader>() as u16,
num_records: 0,
};
let data = vec![0u8; PAGE_SIZE - std::mem::size_of::<PageHeader>()];
Page { header, data }
}
/// レコードを挿入
pub fn insert_record(&mut self, record: &[u8]) -> Result<u16, String> {
if record.len() > self.free_space() {
return Err("Not enough space in page".to_string());
}
let slot_id = self.header.num_records;
let offset = self.header.free_space_offset as usize;
// レコードデータをコピー
self.data[offset..offset + record.len()].copy_from_slice(record);
// ヘッダー更新
self.header.free_space_offset += record.len() as u16;
self.header.num_records += 1;
Ok(slot_id)
}
/// レコードを取得
pub fn get_record(&self, slot_id: u16) -> Option<Vec<u8>> {
if slot_id >= self.header.num_records {
return None;
}
// 簡易実装:固定長レコードと仮定
let record_size = 128; // 仮の固定サイズ
let offset = slot_id as usize * record_size;
if offset + record_size > self.data.len() {
return None;
}
Some(self.data[offset..offset + record_size].to_vec())
}
/// 空き容量を返す
pub fn free_space(&self) -> usize {
PAGE_SIZE - self.header.free_space_offset as usize
}
/// ページをバイト列にシリアライズ
pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
let mut bytes = Vec::with_capacity(PAGE_SIZE);
// ヘッダーをシリアライズ
let header_bytes = bincode::serialize(&self.header)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
bytes.write_all(&header_bytes)?;
// データを書き込み
bytes.write_all(&self.data)?;
// ページサイズにパディング
bytes.resize(PAGE_SIZE, 0);
Ok(bytes)
}
/// バイト列からページを復元
pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
if bytes.len() != PAGE_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid page size",
));
}
// ヘッダーサイズを推定
let header_size = std::mem::size_of::<PageHeader>();
// ヘッダーをデシリアライズ
let header: PageHeader = bincode::deserialize(&bytes[..header_size])
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// データ部分を抽出
let data = bytes[header_size..].to_vec();
Ok(Page { header, data })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page_creation() {
let page = Page::new(1, PageType::Data);
assert_eq!(page.header.page_id, 1);
assert_eq!(page.header.page_type, PageType::Data);
assert!(page.free_space() > 0);
}
#[test]
fn test_record_insert() {
let mut page = Page::new(1, PageType::Data);
let record = b"Hello, Database!";
let slot_id = page.insert_record(record).unwrap();
assert_eq!(slot_id, 0);
assert_eq!(page.header.num_records, 1);
}
#[test]
fn test_serialization() {
let page = Page::new(42, PageType::Data);
let bytes = page.to_bytes().unwrap();
let restored = Page::from_bytes(&bytes).unwrap();
assert_eq!(page.header.page_id, restored.header.page_id);
assert_eq!(page.header.page_type, restored.header.page_type);
}
}
src/index/btree.rs
use std::cmp::Ordering;
use serde::{Deserialize, Serialize};
/// B+ツリーのノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BTreeNode {
Internal(InternalNode),
Leaf(LeafNode),
}
/// 内部ノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InternalNode {
pub keys: Vec<i64>,
pub children: Vec<u32>, // PageId
}
/// リーフノード
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeafNode {
pub keys: Vec<i64>,
pub values: Vec<Vec<u8>>,
pub next: Option<u32>, // 次のリーフへのポインタ
}
/// B+ツリー
pub struct BTree {
pub root: Option<u32>,
pub order: usize, // 最大キー数
}
impl BTree {
/// 新しいB+ツリーを作成
pub fn new(order: usize) -> Self {
BTree { root: None, order }
}
/// キーで検索
pub fn search(&self, key: i64) -> Option<Vec<u8>> {
// 簡易実装:実際にはページを読み込んで検索
None
}
/// キーと値を挿入
pub fn insert(&mut self, key: i64, value: Vec<u8>) -> Result<(), String> {
// ルートが存在しない場合、新しいリーフノードを作成
if self.root.is_none() {
let leaf = LeafNode {
keys: vec![key],
values: vec![value],
next: None,
};
// 実際にはページに書き込む
self.root = Some(0);
return Ok(());
}
// 実際の実装ではここで挿入とノード分割を行う
Ok(())
}
/// 範囲検索
pub fn range_search(&self, start: i64, end: i64) -> Vec<Vec<u8>> {
// 実際にはリーフノードを順次走査
Vec::new()
}
}
impl LeafNode {
/// キーを挿入(ソート済みを維持)
pub fn insert(&mut self, key: i64, value: Vec<u8>) {
let pos = self.keys.binary_search(&key).unwrap_or_else(|e| e);
self.keys.insert(pos, key);
self.values.insert(pos, value);
}
/// ノードが満杯か
pub fn is_full(&self, order: usize) -> bool {
self.keys.len() >= order
}
/// ノードを分割
pub fn split(&mut self) -> LeafNode {
let mid = self.keys.len() / 2;
let right_keys = self.keys.split_off(mid);
let right_values = self.values.split_off(mid);
LeafNode {
keys: right_keys,
values: right_values,
next: self.next,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_leaf_node_insert() {
let mut leaf = LeafNode {
keys: vec![],
values: vec![],
next: None,
};
leaf.insert(10, b"ten".to_vec());
leaf.insert(5, b"five".to_vec());
leaf.insert(15, b"fifteen".to_vec());
// キーがソートされている
assert_eq!(leaf.keys, vec![5, 10, 15]);
}
#[test]
fn test_leaf_node_split() {
let mut leaf = LeafNode {
keys: vec![1, 2, 3, 4],
values: vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec(), b"4".to_vec()],
next: None,
};
let right = leaf.split();
assert_eq!(leaf.keys, vec![1, 2]);
assert_eq!(right.keys, vec![3, 4]);
}
}
src/query/parser.rs
use nom::{
IResult,
bytes::complete::{tag, take_while1, take_until},
character::complete::{alpha1, alphanumeric1, char, multispace0, multispace1},
combinator::{map, opt},
multi::separated_list0,
sequence::{delimited, preceded, tuple},
branch::alt,
};
/// SQLクエリの種類
#[derive(Debug, Clone, PartialEq)]
pub enum Query {
Select(SelectQuery),
Insert(InsertQuery),
Update(UpdateQuery),
Delete(DeleteQuery),
CreateTable(CreateTableQuery),
}
#[derive(Debug, Clone, PartialEq)]
pub struct SelectQuery {
pub columns: Vec<String>,
pub table: String,
pub where_clause: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct InsertQuery {
pub table: String,
pub columns: Vec<String>,
pub values: Vec<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct UpdateQuery {
pub table: String,
pub set_clause: Vec<(String, String)>,
pub where_clause: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DeleteQuery {
pub table: String,
pub where_clause: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CreateTableQuery {
pub table: String,
pub columns: Vec<(String, String)>, // (name, type)
}
/// SQLクエリをパース
pub fn parse_query(input: &str) -> Result<Query, String> {
let input = input.trim();
if input.to_uppercase().starts_with("SELECT") {
parse_select(input)
.map(|(_, q)| Query::Select(q))
.map_err(|e| format!("Parse error: {:?}", e))
} else if input.to_uppercase().starts_with("INSERT") {
parse_insert(input)
.map(|(_, q)| Query::Insert(q))
.map_err(|e| format!("Parse error: {:?}", e))
} else if input.to_uppercase().starts_with("CREATE TABLE") {
parse_create_table(input)
.map(|(_, q)| Query::CreateTable(q))
.map_err(|e| format!("Parse error: {:?}", e))
} else {
Err("Unsupported query type".to_string())
}
}
/// SELECT文のパース
fn parse_select(input: &str) -> IResult<&str, SelectQuery> {
let (input, _) = tag("SELECT")(input)?;
let (input, _) = multispace1(input)?;
// カラムリスト
let (input, columns) = alt((
map(tag("*"), |_| vec!["*".to_string()]),
separated_list0(
delimited(multispace0, char(','), multispace0),
map(alphanumeric1, |s: &str| s.to_string()),
),
))(input)?;
let (input, _) = multispace1(input)?;
let (input, _) = tag("FROM")(input)?;
let (input, _) = multispace1(input)?;
// テーブル名
let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;
// WHERE句(オプション)
let (input, where_clause) = opt(preceded(
tuple((multispace1, tag("WHERE"), multispace1)),
map(take_until(";"), |s: &str| s.to_string()),
))(input)?;
Ok((
input,
SelectQuery {
columns,
table,
where_clause,
},
))
}
/// INSERT文のパース
fn parse_insert(input: &str) -> IResult<&str, InsertQuery> {
let (input, _) = tag("INSERT INTO")(input)?;
let (input, _) = multispace1(input)?;
// テーブル名
let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;
let (input, _) = multispace0(input)?;
// カラムリスト
let (input, columns) = delimited(
char('('),
separated_list0(
delimited(multispace0, char(','), multispace0),
map(alphanumeric1, |s: &str| s.to_string()),
),
char(')'),
)(input)?;
let (input, _) = multispace1(input)?;
let (input, _) = tag("VALUES")(input)?;
let (input, _) = multispace0(input)?;
// 値リスト
let (input, values) = delimited(
char('('),
separated_list0(
delimited(multispace0, char(','), multispace0),
map(
alt((
delimited(char('\''), take_until("'"), char('\'')),
alphanumeric1,
)),
|s: &str| s.to_string(),
),
),
char(')'),
)(input)?;
Ok((
input,
InsertQuery {
table,
columns,
values,
},
))
}
/// CREATE TABLE文のパース
fn parse_create_table(input: &str) -> IResult<&str, CreateTableQuery> {
let (input, _) = tag("CREATE TABLE")(input)?;
let (input, _) = multispace1(input)?;
// テーブル名
let (input, table) = map(alphanumeric1, |s: &str| s.to_string())(input)?;
let (input, _) = multispace0(input)?;
// カラム定義
let (input, columns) = delimited(
char('('),
separated_list0(
delimited(multispace0, char(','), multispace0),
map(
tuple((
alphanumeric1,
multispace1,
alphanumeric1,
)),
|(name, _, typ): (&str, &str, &str)| {
(name.to_string(), typ.to_string())
},
),
),
char(')'),
)(input)?;
Ok((input, CreateTableQuery { table, columns }))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_select() {
let sql = "SELECT * FROM users WHERE id = 1;";
let result = parse_query(sql).unwrap();
if let Query::Select(q) = result {
assert_eq!(q.table, "users");
assert_eq!(q.columns, vec!["*"]);
assert!(q.where_clause.is_some());
} else {
panic!("Expected Select query");
}
}
#[test]
fn test_parse_insert() {
let sql = "INSERT INTO users (id, name) VALUES (1, 'Alice');";
let result = parse_query(sql).unwrap();
if let Query::Insert(q) = result {
assert_eq!(q.table, "users");
assert_eq!(q.columns, vec!["id", "name"]);
assert_eq!(q.values, vec!["1", "Alice"]);
} else {
panic!("Expected Insert query");
}
}
#[test]
fn test_parse_create_table() {
let sql = "CREATE TABLE users (id INT, name VARCHAR);";
let result = parse_query(sql).unwrap();
if let Query::CreateTable(q) = result {
assert_eq!(q.table, "users");
assert_eq!(q.columns.len(), 2);
} else {
panic!("Expected CreateTable query");
}
}
}
src/main.rs
use rust_db::{
query::parser::parse_query,
storage::page::{Page, PageType},
};
use std::io::{self, Write};
fn main() {
println!("=== Rust Database Engine ===");
println!("Enter SQL queries (type 'exit' to quit):\n");
let mut buffer = String::new();
loop {
print!("db> ");
io::stdout().flush().unwrap();
buffer.clear();
io::stdin().read_line(&mut buffer).unwrap();
let input = buffer.trim();
if input.eq_ignore_ascii_case("exit") {
println!("Goodbye!");
break;
}
if input.is_empty() {
continue;
}
match parse_query(input) {
Ok(query) => {
println!("Parsed: {:?}", query);
// ここでクエリを実行
execute_query(query);
}
Err(e) => {
println!("Error: {}", e);
}
}
}
}
fn execute_query(query: rust_db::query::parser::Query) {
use rust_db::query::parser::Query;
match query {
Query::Select(q) => {
println!("Executing SELECT on table '{}'", q.table);
// 実際の実装ではテーブルスキャン
}
Query::Insert(q) => {
println!("Executing INSERT into table '{}'", q.table);
// 実際の実装ではレコード挿入
}
Query::CreateTable(q) => {
println!("Creating table '{}'", q.table);
// 実際の実装ではスキーマ作成
}
_ => {
println!("Query execution not implemented");
}
}
}
---
解説
実装のポイント
1. ページベースストレージ
固定サイズページ:
pub const PAGE_SIZE: usize = 4096;
pub struct Page {
pub header: PageHeader,
pub data: Vec<u8>,
}
- OSのページサイズに合わせて4KB
- ヘッダーとデータ領域を分離
- ディスクI/Oの単位
2. B+ツリーインデックス
リーフノードの連結:
pub struct LeafNode {
pub keys: Vec<i64>,
pub values: Vec<Vec<u8>>,
pub next: Option<u32>, // 次のリーフ
}
- 範囲検索が効率的
- リーフノード間を順次走査
- O(log N)の検索時間
3. SQLパーサー
nom によるパース:
fn parse_select(input: &str) -> IResult<&str, SelectQuery> {
let (input, _) = tag("SELECT")(input)?;
let (input, columns) = separated_list0(...)(input)?;
// ...
}
- パーサコンビネータで構文解析
- 型安全なパース
- エラー回復可能
4. トランザクションログ
Write-Ahead Logging:
// 変更前にログに書き込み
log.write_entry(LogEntry::Update { ... });
log.flush(); // ディスクに同期
// 実際のデータを変更
page.update_record(...);
- 障害からの復旧を保証
- ACID特性のD(耐久性)を実現
- リカバリ時にログをリプレイ
- ページサイズ4KB: OSのページサイズに合わせ、ディスクI/Oを最適化
- B+ツリー: 範囲検索に強く、リーフが連結されている
- 固定長レコード: 実装をシンプルに(実際は可変長も必要)
- シンプルなSQL: SELECT、INSERT、CREATE TABLEに限定
- LSMツリー: 書き込み性能を重視(RocksDB等)
- 可変長レコード: スロット配列で管理
- MVCC: マルチバージョン並行制御
- クエリ最適化: コストベースオプティマイザ
- ストレージ:
設計判断
代替案
---
学習の意図
習得する概念
- インデックス:
- クエリ処理:
- トランザクション:
CSの基礎との関連
データ構造:
- B+ツリー(多分木)
- ハッシュテーブル
- ヒープファイル
- リングバッファ
アルゴリズム:
- 二分探索
- ソート
- ページ置換(LRU)
- ロックプロトコル
OS:
- ファイルシステム
- ページング
- バッファ管理
- 並行処理
分散システム:
- CAP定理
- 2相コミット
- レプリケーション
- シャーディング
---
テスト方法
単体テスト
cargo test
統合テスト
# データベースREPLを起動
cargo run
# テストクエリ
db> CREATE TABLE users (id INT, name VARCHAR);
db> INSERT INTO users (id, name) VALUES (1, 'Alice');
db> SELECT * FROM users;
db> exit
ベンチマーク
# 大量INSERT
time cargo run -- benchmark insert 10000
# 大量SELECT
time cargo run -- benchmark select 10000
---
評価基準
必須要件(60点)
標準要件(30点)
発展要件(10点)
ボーナス(+10点)
---