第17章: 非同期プログラミング (Async Programming)

学習目標

  • async/await 構文を理解する
  • Future トレイトの仕組みを学ぶ
  • tokio ランタイムの基本を理解する
  • 非同期I/Oとタスクの並行実行を学ぶ
  • ---

    17.1 非同期プログラミングとは

    17.1.1 同期 vs 非同期

    ┌─────────────────────────────────────────────────────────┐
    │          同期処理 vs 非同期処理                          │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  【同期処理】                                            │
    │  - 処理が完了するまでブロック                            │
    │  - スレッドが待機状態になる                              │
    │  - シンプルだが非効率                                    │
    │                                                         │
    │  【非同期処理】                                          │
    │  - 処理が完了していなくても他の作業を実行                │
    │  - スレッドを有効活用                                    │
    │  - 複雑だが効率的                                        │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    17.1.2 スレッド vs 非同期

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        // スレッドベース(重い)
        let handles: Vec<_> = (0..1000)
            .map(|i| {
                thread::spawn(move || {
                    thread::sleep(Duration::from_secs(1));
                    println!("Thread {}", i);
                })
            })
            .collect();
    
        // 1000個のOSスレッドを生成!メモリとコンテキストスイッチのコストが高い
    }
    

    use tokio::time::{sleep, Duration};
    
    #[tokio::main]
    async fn main() {
        // 非同期タスク(軽量)
        let handles: Vec<_> = (0..1000)
            .map(|i| {
                tokio::spawn(async move {
                    sleep(Duration::from_secs(1)).await;
                    println!("Task {}", i);
                })
            })
            .collect();
    
        // 1000個の軽量タスク!1つのスレッドで効率的に実行可能
    }
    

    ---

    17.2 async/await 基本

    17.2.1 async 関数

    // 通常の関数
    fn sync_function() -> i32 {
        42
    }
    
    // 非同期関数
    async fn async_function() -> i32 {
        42
    }
    
    fn main() {
        // async関数は Future を返す
        let future = async_function();
    
        // Futureを実行するにはランタイムが必要
    }
    

    17.2.2 await 演算子

    use tokio::time::{sleep, Duration};
    
    async fn say_hello() {
        println!("Hello!");
    }
    
    async fn example() {
        // .await でFutureの完了を待つ
        say_hello().await;
        println!("World!");
    }
    
    #[tokio::main]
    async fn main() {
        example().await;
    }
    

    17.2.3 async ブロック

    #[tokio::main]
    async fn main() {
        let future = async {
            println!("In async block");
            42
        };
    
        let result = future.await;
        println!("Result: {}", result);
    }
    

    ---

    17.3 Future トレイト

    17.3.1 Future の定義

    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    
    pub trait Future {
        type Output;
    
        fn poll(
            self: Pin<&mut Self>,
            cx: &mut Context<'_>
        ) -> Poll<Self::Output>;
    }
    
    pub enum Poll<T> {
        Ready(T),      // 完了
        Pending,       // まだ完了していない
    }
    

    17.3.2 簡単なFutureの実装

    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    
    struct CountdownFuture {
        count: u32,
    }
    
    impl Future for CountdownFuture {
        type Output = String;
    
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            if self.count == 0 {
                Poll::Ready("Done!".to_string())
            } else {
                println!("Countdown: {}", self.count);
                self.count -= 1;
                cx.waker().wake_by_ref();
                Poll::Pending
            }
        }
    }
    
    #[tokio::main]
    async fn main() {
        let future = CountdownFuture { count: 3 };
        let result = future.await;
        println!("{}", result);
    }
    

    ---

    17.4 tokio ランタイム

    17.4.1 tokio の基本

    use tokio;
    
    // #[tokio::main] マクロでランタイムを自動生成
    #[tokio::main]
    async fn main() {
        println!("Hello from tokio!");
    }
    
    // 上記は以下と同等:
    fn main() {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async {
                println!("Hello from tokio!");
            });
    }
    

    17.4.2 マルチスレッドランタイム

    #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
    async fn main() {
        // 4つのワーカースレッドで実行
    }
    

    17.4.3 タスクのスポーン

    use tokio;
    
    #[tokio::main]
    async fn main() {
        let handle = tokio::spawn(async {
            println!("Task running");
            42
        });
    
        let result = handle.await.unwrap();
        println!("Result: {}", result);
    }
    

    ---

    17.5 非同期I/O

    17.5.1 非同期ファイル読み込み

    use tokio::fs::File;
    use tokio::io::{self, AsyncReadExt};
    
    #[tokio::main]
    async fn main() -> io::Result<()> {
        let mut file = File::open("example.txt").await?;
        let mut contents = String::new();
    
        file.read_to_string(&mut contents).await?;
        println!("Contents: {}", contents);
    
        Ok(())
    }
    

    17.5.2 非同期ネットワーク

    use tokio::net::TcpListener;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    
    #[tokio::main]
    async fn main() -> std::io::Result<()> {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;
        println!("Server listening on port 8080");
    
        loop {
            let (mut socket, _) = listener.accept().await?;
    
            tokio::spawn(async move {
                let mut buf = [0; 1024];
    
                loop {
                    let n = match socket.read(&mut buf).await {
                        Ok(n) if n == 0 => return,
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("Error: {}", e);
                            return;
                        }
                    };
    
                    if socket.write_all(&buf[0..n]).await.is_err() {
                        return;
                    }
                }
            });
        }
    }
    

    ---

    17.6 並行実行パターン

    17.6.1 join! - すべて完了を待つ

    use tokio::time::{sleep, Duration};
    
    async fn task1() -> i32 {
        sleep(Duration::from_secs(1)).await;
        1
    }
    
    async fn task2() -> i32 {
        sleep(Duration::from_secs(2)).await;
        2
    }
    
    #[tokio::main]
    async fn main() {
        let (r1, r2) = tokio::join!(task1(), task2());
        println!("Results: {} and {}", r1, r2);  // 2秒後に完了
    }
    

    17.6.2 select! - 最初に完了したものを待つ

    use tokio::time::{sleep, Duration};
    
    async fn task1() -> &'static str {
        sleep(Duration::from_secs(1)).await;
        "task1"
    }
    
    async fn task2() -> &'static str {
        sleep(Duration::from_secs(2)).await;
        "task2"
    }
    
    #[tokio::main]
    async fn main() {
        tokio::select! {
            result = task1() => {
                println!("task1 finished first: {}", result);
            }
            result = task2() => {
                println!("task2 finished first: {}", result);
            }
        }
    }
    

    17.6.3 spawn と JoinHandle

    use tokio;
    
    #[tokio::main]
    async fn main() {
        let handles: Vec<_> = (0..10)
            .map(|i| {
                tokio::spawn(async move {
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    i * 2
                })
            })
            .collect();
    
        let results: Vec<_> = futures::future::join_all(handles)
            .await
            .into_iter()
            .map(|r| r.unwrap())
            .collect();
    
        println!("{:?}", results);
    }
    

    ---

    17.7 実践例:HTTPクライアント

    17.7.1 reqwest を使ったHTTPリクエスト

    use reqwest;
    use tokio;
    
    #[tokio::main]
    async fn main() -> Result<(), reqwest::Error> {
        let url = "https://api.github.com/repos/rust-lang/rust";
    
        let response = reqwest::get(url).await?;
        let body = response.text().await?;
    
        println!("Response: {}", body);
    
        Ok(())
    }
    

    17.7.2 並列HTTPリクエスト

    use reqwest;
    use tokio;
    
    async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
        let response = reqwest::get(url).await?;
        let body = response.text().await?;
        Ok(body)
    }
    
    #[tokio::main]
    async fn main() {
        let urls = vec![
            "https://example.com",
            "https://rust-lang.org",
            "https://github.com",
        ];
    
        let futures: Vec<_> = urls
            .iter()
            .map(|url| fetch_url(url))
            .collect();
    
        let results = futures::future::join_all(futures).await;
    
        for (i, result) in results.iter().enumerate() {
            match result {
                Ok(body) => println!("URL {}: {} bytes", i, body.len()),
                Err(e) => println!("URL {}: Error {}", i, e),
            }
        }
    }
    

    ---

    17.8 エラーハンドリング

    17.8.1 async 関数での Result

    use tokio::fs::File;
    use tokio::io::{self, AsyncReadExt};
    
    async fn read_file(path: &str) -> io::Result<String> {
        let mut file = File::open(path).await?;
        let mut contents = String::new();
        file.read_to_string(&mut contents).await?;
        Ok(contents)
    }
    
    #[tokio::main]
    async fn main() {
        match read_file("example.txt").await {
            Ok(contents) => println!("Contents: {}", contents),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
    

    17.8.2 複数のエラー型

    use std::error::Error;
    
    async fn complex_operation() -> Result<String, Box<dyn Error>> {
        let response = reqwest::get("https://example.com").await?;
        let text = response.text().await?;
    
        let file = tokio::fs::File::create("output.txt").await?;
        // ...
    
        Ok(text)
    }
    

    ---

    17.9 ストリーム

    17.9.1 Stream トレイト

    use futures::stream::{self, StreamExt};
    
    #[tokio::main]
    async fn main() {
        let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
    
        while let Some(value) = stream.next().await {
            println!("Value: {}", value);
        }
    }
    

    17.9.2 カスタムストリーム

    use futures::stream::{Stream, StreamExt};
    use std::pin::Pin;
    use std::task::{Context, Poll};
    
    struct CounterStream {
        current: u32,
        max: u32,
    }
    
    impl Stream for CounterStream {
        type Item = u32;
    
        fn poll_next(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>
        ) -> Poll<Option<Self::Item>> {
            if self.current < self.max {
                let current = self.current;
                self.current += 1;
                Poll::Ready(Some(current))
            } else {
                Poll::Ready(None)
            }
        }
    }
    
    #[tokio::main]
    async fn main() {
        let mut stream = CounterStream { current: 0, max: 5 };
    
        while let Some(value) = stream.next().await {
            println!("Count: {}", value);
        }
    }
    

    ---

    17.10 ベストプラクティス

    17.10.1 async vs スレッド

    ┌─────────────────────────────────────────────────────────┐
    │          async を使うべき場面                            │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  ✓ I/O バウンドな処理(ネットワーク、ファイル)          │
    │  ✓ 大量の並行接続                                        │
    │  ✓ レイテンシが重要                                      │
    │                                                         │
    │  【スレッドを使うべき場面】                              │
    │                                                         │
    │  ✓ CPU バウンドな処理                                    │
    │  ✓ 並列計算                                              │
    │  ✓ ブロッキングライブラリを使う場合                      │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    17.10.2 避けるべきパターン

    // 悪い例: async関数内でブロッキング操作
    async fn bad_example() {
        std::thread::sleep(std::time::Duration::from_secs(1));  // NG!
    }
    
    // 良い例: 非同期sleepを使う
    async fn good_example() {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;  // OK!
    }
    
    // ブロッキング操作が必要な場合
    async fn with_blocking() {
        tokio::task::spawn_blocking(|| {
            // ブロッキング操作
            std::thread::sleep(std::time::Duration::from_secs(1));
        }).await.unwrap();
    }
    

    ---

    17.11 まとめ

    非同期プログラミングの利点

    ┌─────────────────────────────────────────────────────────┐
    │          Rustの非同期プログラミング                      │
    ├─────────────────────────────────────────────────────────┤
    │                                                         │
    │  ✓ ゼロコスト抽象化                                      │
    │    └─ 手書き状態機械と同等の性能                        │
    │                                                         │
    │  ✓ メモリ効率                                            │
    │    └─ スレッドより軽量                                  │
    │                                                         │
    │  ✓ 安全性                                                │
    │    └─ 型システムで保証                                  │
    │                                                         │
    │  ✓ 柔軟性                                                │
    │    └─ 複数のランタイムを選択可能                        │
    │                                                         │
    └─────────────────────────────────────────────────────────┘
    

    次のステップ

    次の章では、unsafe Rustについて学びます。rawポインタ、FFI、Cライブラリとの連携を理解します。

    ---

    参考資料

  • Async Book
  • tokio documentation
  • futures documentation