第17章: 非同期プログラミング (Async Programming)
学習目標
- async/await 構文を理解する
- Future トレイトの仕組みを学ぶ
- tokio ランタイムの基本を理解する
- 非同期I/Oとタスクの並行実行を学ぶ
---
17.1 非同期プログラミングとは
17.1.1 同期 vs 非同期
┌─────────────────────────────────────────────────────────┐
│ 同期処理 vs 非同期処理 │
├─────────────────────────────────────────────────────────┤
│ │
│ 【同期処理】 │
│ - 処理が完了するまでブロック │
│ - スレッドが待機状態になる │
│ - シンプルだが非効率 │
│ │
│ 【非同期処理】 │
│ - 処理が完了していなくても他の作業を実行 │
│ - スレッドを有効活用 │
│ - 複雑だが効率的 │
│ │
└─────────────────────────────────────────────────────────┘
17.1.2 スレッド vs 非同期
use std::thread;
use std::time::Duration;
fn main() {
// スレッドベース(重い)
let handles: Vec<_> = (0..1000)
.map(|i| {
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
println!("Thread {}", i);
})
})
.collect();
// 1000個のOSスレッドを生成!メモリとコンテキストスイッチのコストが高い
}
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 非同期タスク(軽量)
let handles: Vec<_> = (0..1000)
.map(|i| {
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
println!("Task {}", i);
})
})
.collect();
// 1000個の軽量タスク!1つのスレッドで効率的に実行可能
}
---
17.2 async/await 基本
17.2.1 async 関数
// 通常の関数
fn sync_function() -> i32 {
42
}
// 非同期関数
async fn async_function() -> i32 {
42
}
fn main() {
// async関数は Future を返す
let future = async_function();
// Futureを実行するにはランタイムが必要
}
17.2.2 await 演算子
use tokio::time::{sleep, Duration};
async fn say_hello() {
println!("Hello!");
}
async fn example() {
// .await でFutureの完了を待つ
say_hello().await;
println!("World!");
}
#[tokio::main]
async fn main() {
example().await;
}
17.2.3 async ブロック
#[tokio::main]
async fn main() {
let future = async {
println!("In async block");
42
};
let result = future.await;
println!("Result: {}", result);
}
---
17.3 Future トレイト
17.3.1 Future の定義
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 完了
Pending, // まだ完了していない
}
17.3.2 簡単なFutureの実装
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountdownFuture {
count: u32,
}
impl Future for CountdownFuture {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready("Done!".to_string())
} else {
println!("Countdown: {}", self.count);
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let future = CountdownFuture { count: 3 };
let result = future.await;
println!("{}", result);
}
---
17.4 tokio ランタイム
17.4.1 tokio の基本
use tokio;
// #[tokio::main] マクロでランタイムを自動生成
#[tokio::main]
async fn main() {
println!("Hello from tokio!");
}
// 上記は以下と同等:
fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
println!("Hello from tokio!");
});
}
17.4.2 マルチスレッドランタイム
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
// 4つのワーカースレッドで実行
}
17.4.3 タスクのスポーン
use tokio;
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
println!("Task running");
42
});
let result = handle.await.unwrap();
println!("Result: {}", result);
}
---
17.5 非同期I/O
17.5.1 非同期ファイル読み込み
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::open("example.txt").await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
println!("Contents: {}", contents);
Ok(())
}
17.5.2 非同期ネットワーク
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on port 8080");
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
if socket.write_all(&buf[0..n]).await.is_err() {
return;
}
}
});
}
}
---
17.6 並行実行パターン
17.6.1 join! - すべて完了を待つ
use tokio::time::{sleep, Duration};
async fn task1() -> i32 {
sleep(Duration::from_secs(1)).await;
1
}
async fn task2() -> i32 {
sleep(Duration::from_secs(2)).await;
2
}
#[tokio::main]
async fn main() {
let (r1, r2) = tokio::join!(task1(), task2());
println!("Results: {} and {}", r1, r2); // 2秒後に完了
}
17.6.2 select! - 最初に完了したものを待つ
use tokio::time::{sleep, Duration};
async fn task1() -> &'static str {
sleep(Duration::from_secs(1)).await;
"task1"
}
async fn task2() -> &'static str {
sleep(Duration::from_secs(2)).await;
"task2"
}
#[tokio::main]
async fn main() {
tokio::select! {
result = task1() => {
println!("task1 finished first: {}", result);
}
result = task2() => {
println!("task2 finished first: {}", result);
}
}
}
17.6.3 spawn と JoinHandle
use tokio;
#[tokio::main]
async fn main() {
let handles: Vec<_> = (0..10)
.map(|i| {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
i * 2
})
})
.collect();
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!("{:?}", results);
}
---
17.7 実践例:HTTPクライアント
17.7.1 reqwest を使ったHTTPリクエスト
use reqwest;
use tokio;
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let url = "https://api.github.com/repos/rust-lang/rust";
let response = reqwest::get(url).await?;
let body = response.text().await?;
println!("Response: {}", body);
Ok(())
}
17.7.2 並列HTTPリクエスト
use reqwest;
use tokio;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://example.com",
"https://rust-lang.org",
"https://github.com",
];
let futures: Vec<_> = urls
.iter()
.map(|url| fetch_url(url))
.collect();
let results = futures::future::join_all(futures).await;
for (i, result) in results.iter().enumerate() {
match result {
Ok(body) => println!("URL {}: {} bytes", i, body.len()),
Err(e) => println!("URL {}: Error {}", i, e),
}
}
}
---
17.8 エラーハンドリング
17.8.1 async 関数での Result
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}
#[tokio::main]
async fn main() {
match read_file("example.txt").await {
Ok(contents) => println!("Contents: {}", contents),
Err(e) => eprintln!("Error: {}", e),
}
}
17.8.2 複数のエラー型
use std::error::Error;
async fn complex_operation() -> Result<String, Box<dyn Error>> {
let response = reqwest::get("https://example.com").await?;
let text = response.text().await?;
let file = tokio::fs::File::create("output.txt").await?;
// ...
Ok(text)
}
---
17.9 ストリーム
17.9.1 Stream トレイト
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
while let Some(value) = stream.next().await {
println!("Value: {}", value);
}
}
17.9.2 カスタムストリーム
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
struct CounterStream {
current: u32,
max: u32,
}
impl Stream for CounterStream {
type Item = u32;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.current < self.max {
let current = self.current;
self.current += 1;
Poll::Ready(Some(current))
} else {
Poll::Ready(None)
}
}
}
#[tokio::main]
async fn main() {
let mut stream = CounterStream { current: 0, max: 5 };
while let Some(value) = stream.next().await {
println!("Count: {}", value);
}
}
---
17.10 ベストプラクティス
17.10.1 async vs スレッド
┌─────────────────────────────────────────────────────────┐
│ async を使うべき場面 │
├─────────────────────────────────────────────────────────┤
│ │
│ ✓ I/O バウンドな処理(ネットワーク、ファイル) │
│ ✓ 大量の並行接続 │
│ ✓ レイテンシが重要 │
│ │
│ 【スレッドを使うべき場面】 │
│ │
│ ✓ CPU バウンドな処理 │
│ ✓ 並列計算 │
│ ✓ ブロッキングライブラリを使う場合 │
│ │
└─────────────────────────────────────────────────────────┘
17.10.2 避けるべきパターン
// 悪い例: async関数内でブロッキング操作
async fn bad_example() {
std::thread::sleep(std::time::Duration::from_secs(1)); // NG!
}
// 良い例: 非同期sleepを使う
async fn good_example() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; // OK!
}
// ブロッキング操作が必要な場合
async fn with_blocking() {
tokio::task::spawn_blocking(|| {
// ブロッキング操作
std::thread::sleep(std::time::Duration::from_secs(1));
}).await.unwrap();
}
---
17.11 まとめ
非同期プログラミングの利点
┌─────────────────────────────────────────────────────────┐
│ Rustの非同期プログラミング │
├─────────────────────────────────────────────────────────┤
│ │
│ ✓ ゼロコスト抽象化 │
│ └─ 手書き状態機械と同等の性能 │
│ │
│ ✓ メモリ効率 │
│ └─ スレッドより軽量 │
│ │
│ ✓ 安全性 │
│ └─ 型システムで保証 │
│ │
│ ✓ 柔軟性 │
│ └─ 複数のランタイムを選択可能 │
│ │
└─────────────────────────────────────────────────────────┘
次のステップ
次の章では、unsafe Rustについて学びます。rawポインタ、FFI、Cライブラリとの連携を理解します。
---