RustのカスタムFutureによるポーリングの理解
Takashi Yamamoto
Infrastructure Engineer · Leapcell

はじめに
非同期プログラミングは、特にネットワーキング、I/Oバウンドタスク、高同時実行システムといった分野で、高性能で応答性の高いアプリケーションを構築するための不可欠なパラダイムとなっています。Rustは、その強力な型システムと所有権モデルにより、Future
トレイトに基づいた、強力で安全な非同期プログラミングのアプローチを提供します。async/await
構文を通じてFutureを操作することが多いですが、これらの抽象化が内部でどのように機能するかを真に理解することは、デバッグ、最適化、さらにはカスタム非同期コンポーネントの設計においても重要です。カスタムFuture
を作成するこの詳細な解説は、ポーリングメカニズムを解明し、非同期タスクとエグゼキュータ間の基本的なやり取りを明らかにし、最終的にはRustの非同期機能をより自信を持って精密に使いこなせるようにします。
非同期実行の心臓部:ポーリング
カスタムFutureを構築する前に、関連するコアコンセプトを明確に理解しましょう。
- Futureトレイト: Rustでは、
Future
は、完了している場合もしていない場合もある非同期計算を表すトレイトです。これには単一のメソッドpoll
があり、エグゼキュータはFutureの進捗状況を確認するためにこれを繰り返し呼び出します。 - エグゼキュータ: エグゼキュータは、
Future
を取得し、poll
メソッドを繰り返し呼び出すことで完了まで駆動する責任を負います。Futureのライフサイクルを管理し、タスクをスケジュールし、タスクが進捗の準備ができたときにそれをウェイクアップさせることを処理します。一般的なエグゼキュータにはtokio
やasync-std
があります。 - ポーリング: これは、エグゼキュータが未完了の
Future
に対してpoll
メソッドを呼び出す行為です。poll
が呼び出されると、Futureは進捗の試みを行います。 Poll
enum:poll
メソッドはPoll
enumを返します。これには2つのバリアントがあります。Poll::Ready(T)
: Futureが正常に完了したことを示し、T
はその計算結果です。Poll::Pending
: Futureがまだ完了していないことを示します。Pending
が返された場合、Futureは(Waker
を介して)進捗の準備ができたときにウェイクアップされるように手配しなければなりません。
Waker
:Waker
は、エグゼキュータによって提供される低レベルのメカニズムであり、Future
が再度ポーリングされる準備ができたことをシグナルすることができます。FutureがPoll::Pending
を返すと、Context
からWaker
を取得してクローンします。後で、ソケットへのデータ到着、タイマーの期限切れなど、Futureのブロックを解除する可能性のあるイベントが発生すると、Futureはwaker.wake_by_ref()
を呼び出して、リポーリングするようにエグゼキュータに通知します。Context
:poll
メソッドに渡されるContext
には、Waker
と、Futureがエグゼキュータと対話するのに役立つその他の情報が含まれています。
カスタムFutureの構築:シンプルな遅延
非ブロッキング遅延を導入するカスタムFuture
を作成してみましょう。これにより、ポーリングメカニズムを直接観察できます。
deadline
(完了すべき時間)とオプションのWaker
(タスクをウェイクアップするため)を保持するDelay
構造体を定義します。
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; // 遅延Futureの状態を表す struct Delay { deadline: Instant, // デッドラインが経過したときにFutureをウェイクアップするためにWakerを格納する必要があります。 // Arc<Mutex<Option<Waker>>>は、スレッド間でWakerを共有し安全に編集することを可能にします。 waker_storage: Arc<Mutex<Option<Waker>>>, // タイマーThreadが一度だけスポーンされることを保証するフラグ。 timer_thread_spawned: bool, } impl Delay { fn new(duration: Duration) -> Self { Delay { deadline: Instant::now() + duration, waker_storage: Arc::new(Mutex::new(None)), timer_thread_spawned: false, } } } // Delay構造体に対するFutureトレイトを実装 impl Future for Delay { // Futureの出力型はユニットです。遅延を表すだけなので。 type Output = (); // Futureの核心:pollメソッド fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // デッドラインが既に経過している場合、Futureは準備完了です。 if Instant::now() >= self.deadline { println!("Delay future: Deadline reached. Returning Ready."); return Poll::Ready(()); } // --- Wakerの格納とタイマーの設定(一度だけ) --- // タイマーThreadがまだスポーンされていない場合、それを設定します。 if !self.timer_thread_spawned { println!("Delay future: First poll. Storing waker and spawning timer thread."); // コンテキストから現在のWakerを格納します。 // このWakerは、タイマースレッドがこのタスクをウェイクアップするために使用します。 let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone()); drop(waker_guard); // ロックを早期に解放 // 新しいThreadに渡すためにArcをクローンします。 let waker_storage_clone = self.waker_storage.clone(); let duration_until_deadline = self.deadline.duration_since(Instant::now()); // デッドラインまでスリープし、元のタスクをウェイクアップする新しいThreadをスポーンします。 thread::spawn(move || { thread::sleep(duration_until_deadline); println!("Delay timer thread: Deadline passed. Waking up the task."); // Wakerを取得してタスクをウェイクアップします if let Some(waker) = waker_storage_clone.lock().unwrap().take() { waker.wake(); } }); // 複数回タイマーThreadをスポーンしないように、タイマーThreadがスポーンされたことをマークします。 self.timer_thread_spawned = true; } else { // この部分は、タイマースレッドが既に実行されている場合の後続のポーリングを処理します。 // タスクが移動または再スケジュールされるとエグゼキュータが判断した場合、Wakerを更新することが重要です。 // Wakerが更新されないと、以前のWakerが無効になり、タスクがウェイクアップされなくなる可能性があります。 let mut waker_guard = self.waker_storage.lock().unwrap(); // waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) は、Wakerが存在しないか、または新しいWakerが古いWakerとは異なる(will_wakeがfalseを返す)場合にtrueになります。 // これは、タスクが移動されたり、エグゼキュータが異なるWakeerを提供したりした場合にWakerを更新するために必要です。 if waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) { println!("Delay future: Waker changed. Updating."); *waker_guard = Some(cx.waker().clone()); } } // デッドラインがまだ経過していない場合、Futureはペンディング状態です。 // タイマーThreadによって`waker.wake()`が呼び出されると、リポーリングされます。 println!("Delay future: Deadline not yet reached. Returning Pending."); Poll::Pending } } #[tokio::main] async fn main() { println!("Main: Starting program."); let delay_future = Delay::new(Duration::from_secs(2)); // 2秒の遅延を作成 println!("Main: Awaiting delay future..."); delay_future.await; // カスタムFutureを待機 println!("Main: Delay completed. Program finished."); }
Delay
Futureの説明:
-
struct Delay
:deadline
: 遅延が終了すべき時点を表すInstant
。waker_storage
:Arc<Mutex<Option<Waker>>>
は不可欠です。Waker
はFuture
(self.waker_storage
を所有している)と、wake
を呼び出す別個のthread::spawn
の間で共有される必要があります。Arc
は共有所有権を可能にし、Mutex
はWaker
を格納および取得するための安全な内部可変性を提供します。Option
は、Waker
が格納される前の最初のpoll
で利用可能でない場合があるため使用されます。timer_thread_spawned
: タイマー(thread::spawn
部分)のみを一度設定することを保証する単純なブールフラグ。
-
impl Future for Delay
:type Output = ();
: 私たちの遅延Futureは単に完了し、意味のある値は生成しません。poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
: これが核心です。if Instant::now() >= self.deadline
: 各ポーリングで、デッドラインが経過したかどうかを確認します。経過していれば、Ready
であり、Poll::Ready(())
を返します。if !self.timer_thread_spawned
: この条件ブロックは、実際のタイマー(thread::spawn
部分)を一度だけ設定することを保証します。let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone());
:waker_storage
のロックを取得し、現在のContext
からWaker
をclone
して格納します。このWaker
は、現在ポーリングされているこの特定のタスクを指します。thread::spawn(...)
: 標準のRustスレッドを起動します。このスレッドは、残りの期間sleep()
します。これはヘルパーThreadの観点からはブロッキングsleep
ですが、別のOS Thread内にあるためエグゼキュータThreadをブロックしません。- スポーンされたThread内で、スリープを完了した後、格納された
Waker
を取得しwaker.wake()
を呼び出します。このwake()
呼び出しは、非同期ランタイム(main
のTokio)に、このWaker
に関連付けられたタスクが再度ポーリングの準備ができたことを通知します。 self.timer_thread_spawned = true;
: 複数回タイマーThreadをスポーンしないように、フラグをtrueに設定します。
else { ... }
: タイマースレッドが既にスポーンされている場合(つまり、既にペンディング状態のFutureを再度ポーリングする場合)、Context
内のWaker
が変更されたかどうか(!w.will_wake(cx.waker())
)を確認する必要があります。変更された場合は、格納されたWaker
を更新します。これは、エグゼキュータがタスクを移動または再スケジュールすることがあり、タスクを正しく通知するために新しいWaker
が必要になる場合があるため重要です。Poll::Pending
: デッドラインが経過しておらず、タイマーが設定されている場合、Futureはまだ待機中です。Poll::Pending
を返します。エグゼキュータは、waker.wake()
が呼び出されるまで、このFutureのポーリングを停止します。
tokio::main
およびawait
での動作:
Delay::new(Duration::from_secs(2))
:Delay
インスタンスが作成されます。delay_future.await
: ここが魔法が起こる場所です。- Tokioのエグゼキュータが
delay_future
を受け取ります。 - 最初のポーリング: エグゼキュータは
delay_future.poll(cx)
を呼び出します。- デッドラインは満たされていません。
timer_thread_spawned
はfalse
です。cx
からのWaker
がクローンされ、delay_future.waker_storage
に格納されます。- 新しい
thread::spawn
が作成されます。このスレッドは2秒間スリープを開始します。 timer_thread_spawned
がtrue
に設定されます。poll
はPoll::Pending
を返します。
Poll::Pending
後のエグゼキュータのアクション: エグゼキュータはdelay_future
が準備できていないことを認識します。このタスクを脇に置き、他の準備完了タスク(存在する場合)をポーリングするか、waker.wake()
の呼び出しを待ちます。重要なのは、Tokioランタイムスレッドは、私たちのthread::spawn
のthread::sleep
によってブロックされないことです。- 2秒後:
thread::spawn
のthread::sleep
が完了します。- 格納された
Waker
を取得し、waker.wake()
を呼び出します。
- 格納された
waker.wake()
後のエグゼキュータのアクション: エグゼキュータは、delay_future
に関連付けられたタスクのウェイクアップシグナルを受信します。エグゼキュータは、delay_future
を再度ポーリングするようにスケジュールします。- 2番目(またはそれ以降)のポーリング: エグゼキュータは
delay_future.poll(cx)
を再度呼び出します。- ここで、
Instant::now() >= self.deadline
がtrueになります。 poll
はPoll::Ready(())
を返します。
- ここで、
- 完了:
delay_future.await
式は最終的に完了し、main
関数が続行されます。
- Tokioのエグゼキュータが
結論
カスタムDelay
Futureを実装することにより、Rustの非同期ポーリングメカニズムについて実践的な理解を得ました。エグゼキュータによってFuture::poll
が繰り返し呼び出されること、Poll::Pending
が未完了の状態を示すこと、そして重要なこととして、Waker
が進捗が可能になったときにエグゼキュータにポーリングを再開するように通知することを、進捗の橋渡しとして機能することを確認しました。Waker
を介したFuture
とExecutor
間のこの明確なやり取りは、Rustの効率的で非ブロッキングな非同期プログラミングの基盤であり、ブロッキングスレッドのオーバーヘッドなしで高性能でスケーラブルなアプリケーションを可能にします。カスタムFuture
実装を習得することは、Rustの強力な非同期エコシステムへのより深い洞察を解き放つ高度なスキルです。