Rust의 비동기 프로그래밍: Futures, Executors, 작업 스케줄링
Daniel Hayes
Full-Stack Engineer · Leapcell

Future의 정의
Future
는 Rust의 비동기 프로그래밍의 핵심입니다. 다음은 Future
트레이트의 정의입니다.
#[must_use = "futures do nothing unless you `.await` or poll them"] #[lang = "future_trait"] pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } #[must_use = "this `Poll` may be a `Pending` variant, which should be handled"] pub enum Poll<T> { Ready(T), Pending, }
Future
는 연관된 타입 Output
과 Poll<Self::Output>
를 반환하는 poll()
메서드를 가집니다. Poll
은 Ready
와 Pending
두 가지 variant를 가진 enum입니다. poll()
메서드를 호출함으로써, Future
는 작업이 완료되고 스위치 아웃될 때까지 완료를 향해 더 진행할 수 있습니다.
현재
poll
호출에서,Future
가 완료되면Poll::Ready(result)
를 반환하며, 이는Future
의 값이 반환됨을 의미합니다.Future
가 아직 완료되지 않았으면Poll::Pending()
을 반환합니다. 이 시점에서Future
는 일시 중단되고 일부 이벤트(wake 함수를 통해)에 의해 깨어나기를 기다립니다.
Executor (실행 스케줄러)
executor는 Future
에 대한 스케줄러입니다. 운영 체제가 스레드 스케줄링을 담당하지만, 사용자 공간 코루틴(예: Future
s)은 스케줄링하지 않습니다. 따라서, 동시성을 위해 코루틴을 사용하는 모든 프로그램은 스케줄링을 처리할 executor가 필요합니다.
Rust의 Future
는 lazy합니다. 오직 polled될 때만 실행됩니다. 이를 구동하는 한 가지 방법은 .await
를 사용하여 async 함수 내에서 다른 async 함수를 호출하는 것이지만, 이는 async 함수 자체 내에서만 문제를 해결합니다. 가장 바깥쪽의 async 함수는 여전히 executor에 의해 구동되어야 합니다.
Executor 런타임
Rust는 Future
와 같은 코루틴을 제공하지만, 언어 수준에서 executor를 제공하지 않습니다. 코루틴이 사용되지 않으면 런타임을 가져올 필요가 없습니다. 필요한 경우, 생태계는 다양한 executor를 선택할 수 있도록 제공합니다.
Rust에서 흔히 사용되는 4가지 executor는 다음과 같습니다.
futures
: 이 라이브러리는 간단한 내장 executor와 함께 제공됩니다.tokio
: executor를 제공합니다.#[tokio::main]
을 사용하면 Tokio의 executor가 암묵적으로 포함됩니다.async-std
: Tokio와 유사한 executor를 제공합니다.smol
:async-executor
를 제공하며, 주로block_on
을 노출합니다.
Wake 알림 메커니즘
executor는 Future
그룹(일반적으로 가장 바깥쪽의 async 함수)을 관리하고, 완료될 때까지 지속적으로 polling하여 진행합니다. 처음에 executor는 Future
를 한 번 poll합니다. 그 후에는 다시 적극적으로 poll하지 않습니다. poll
메서드가 Poll::Pending
을 반환하면, Future
는 일부 이벤트가 wake()
함수를 통해 깨울 때까지 일시 중단됩니다. 그러면 Future
는 executor에 적극적으로 알리고, polling을 재개하고 작업 실행을 계속하도록 프롬프트할 수 있습니다. 이 wake-then-poll 주기는 Future
가 완료될 때까지 반복됩니다.
Waker
는 연결된 작업이 재개될 준비가 되었음을 executor에게 알리는 wake()
메서드를 제공하여 executor가 해당 Future
를 다시 poll할 수 있도록 합니다.
Context
는 Waker
를 감싸는 wrapper입니다. poll
메서드에서 사용되는 Context
구조체를 살펴봅시다.
pub struct Context<'a> { waker: &'a Waker, _marker: PhantomData<fn(&'a ()) -> &'a ()>, }
Waker
의 정의와 구현은 상당히 추상적입니다. 내부적으로 가상 함수 테이블(vtable)을 사용하여 다양한 waker
동작을 허용합니다.
pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()), }
Rust 자체는 async 런타임을 제공하지 않습니다. 표준 라이브러리에서 기본 인터페이스만 정의하고 런타임 동작은 타사 런타임에 의해 구현되도록 합니다. 따라서 표준 라이브러리에서는 인터페이스 정의와 일부 고급 구현만 볼 수 있습니다. 예를 들어 Waker
의 wake()
메서드는 단순히 vtable의 해당 함수에 대한 호출을 위임합니다.
impl Waker { /// Wake up the task associated with this `Waker`. #[inline] pub fn wake(self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. let wake = self.waker.vtable.wake; let data = self.waker.data; // Don't call `drop` -- the waker will be consumed by `wake`. crate::mem::forget(self); // SAFETY: This is safe because `Waker::from_raw` is the only way // to initialize `wake` and `data` requiring the user to acknowledge // that the contract of `RawWaker` is upheld. unsafe { (wake)(data) }; } ... }
vtable의 실제 구현은 표준 라이브러리에 있지 않고, futures
crate의 런타임과 같은 타사 async 런타임에서 제공됩니다.
타이머 구축
타이머 예제를 사용하여 Future
의 스케줄링 메커니즘을 이해해 보겠습니다. 목표는 다음과 같습니다. 타이머를 생성할 때 특정 기간 동안 sleep하는 새 스레드가 생성되고, 해당 시간이 지나면 타이머 Future
에 신호를 보냅니다.
참고: futures
crate에서 Waker
를 구성하는 편리한 방법을 제공하는 ArcWake
트레이트가 필요합니다. Cargo.toml
을 편집하고 다음 의존성을 추가합니다.
[dependencies] futures = "0.3"
타이머 Future
에 대한 전체 코드:
// future_timer.rs use futures; use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Future와 sleeping 스레드 간의 공유 상태 struct SharedState { /// 타이머(sleep)가 완료되었는지 여부를 나타냅니다. completed: bool, /// Sleep이 끝나면 스레드는 이 `waker`를 사용하여 `TimerFuture`에 작업 깨우기를 알릴 수 있습니다. waker: Option<Waker>, } impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // 공유 상태를 확인하여 타이머가 완료되었는지 확인합니다. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { println!("future ready. execute poll to return."); Poll::Ready(()) } else { println!("future not ready, tell the future task how to wakeup to executor"); // `Future`가 다시 polled될 수 있도록 sleep이 완료되면 새 스레드가 작업을 깨울 수 있도록 `waker`를 설정합니다. // 여기서 `clone`은 모든 `poll`에서 발생하지만, 이상적으로는 한 번만 발생해야 합니다. // 매번 복제하는 이유는 `TimerFuture`가 executor에서 작업 간에 이동할 수 있기 때문입니다. // 단일 `waker` 인스턴스가 변경되어 잘못된 작업을 가리킬 수 있으며, // executor가 잘못된 작업을 실행할 수 있습니다. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } impl TimerFuture { /// 지정된 기간 후에 완료되는 새 `TimerFuture`를 만듭니다. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // 새 스레드를 생성합니다. let thread_shared_state = shared_state.clone(); thread::spawn(move || { // 타이머를 시뮬레이션하기 위해 지정된 기간 동안 sleep합니다. thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // 타이머가 완료되었음을 executor에 알리고 해당 `Future`를 다시 polled할 수 있습니다. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { println!("detect future is ready, wakeup the future task to executor."); waker.wake() } }); TimerFuture { shared_state } } } fn main() { // 아직 자체 executor를 구현하지 않았으므로 `futures` crate에서 제공하는 executor를 사용합니다. futures::executor::block_on(TimerFuture::new(Duration::new(10, 0))); }
실행 결과:
future not ready, tell the future task how to wakeup to executor detect future is ready, wakeup the future task to executor. future ready. execute poll to return.
위에서 볼 수 있듯이, 처음에는 10초 타이머가 완료되지 않았고 Pending
상태입니다. 이 시점에서 작업이 준비되면 자체를 깨우는 방법을 작업에 알려야 합니다. 10초 후 타이머가 완료되고 이전에 설정된 Waker
를 사용하여 실행할 Future
작업을 깨웁니다.
Executor 구축
이전 코드에서는 자체 스케줄러를 구현하지 않고 futures
crate에서 제공하는 executor를 사용했습니다. 이제 작동 방식을 이해하기 위해 자체 executor를 직접 구축해 보겠습니다. 그러나 실제 Rust async 프로그래밍에서는 일반적으로 tokio
라이브러리를 사용합니다. 여기서는 async가 어떻게 작동하는지 더 잘 이해하기 위해 학습 목적으로 처음부터 빌드합니다.
키 코드:
// future_executor.rs use { futures::{ future::{BoxFuture, FutureExt}, task::{waker_ref, ArcWake}, }, std::{ future::Future, sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, task::Context, time::Duration, }, }; mod future_timer; // 이전에 구현한 타이머 모듈을 가져옵니다. use future_timer::TimerFuture; /// 작업 executor: 채널에서 작업을 수신하고 실행하는 역할을 합니다. struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner`는 새 `Future`를 만들고 작업 채널로 보내는 역할을 합니다. #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// 자체를 예약할 수 있는 `Future`(자체를 작업 채널로 보내서) 및 실행될 때까지 기다립니다. struct Task { /// 미래의 어느 시점에 완료될 진행 중인 `Future` /// /// 기술적으로 여기에서 `Mutex`는 모든 것을 단일 스레드에서 실행하기 때문에 불필요합니다. /// 그러나 Rust는 `Future`가 스레드 간에 공유되지 않는다는 것을 알 수 없으므로 /// `Mutex`를 사용하여 스레드 안전성에 대한 컴파일러 요구 사항을 충족합니다. /// /// 프로덕션급 executor는 오버헤드를 발생시키기 때문에 여기에서 `Mutex`를 사용하지 않습니다. /// 대신 `UnsafeCell`을 사용합니다. future: Mutex<Option<BoxFuture<'static, ()>>>, /// 이 작업이 자신을 작업 큐에 다시 제출하여 executor가 `poll`할 때까지 기다릴 수 있습니다. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // 작업 채널에서 버퍼링된 최대 작업 수(큐 길이) // 이 구현은 단순화되었습니다. 실제 executor는 다르게 처리합니다. const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender }) } impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); println!("first dispatch the future task to executor."); self.task_sender.send(task).expect("too many tasks queued."); } } /// 작업을 깨우고 실행하도록 예약하는 방법을 정의하기 위해 `ArcWake`를 구현합니다. impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Wake는 작업을 다시 작업 채널로 보내서 구현됩니다. // 따라서 executor는 다시 `poll`합니다. let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("too many tasks queued"); } } impl Executor { /// 작업을 지속적으로 수신하고 실행하여 실제로 `Future` 작업을 실행합니다. fn run(&self) { let mut count = 0; while let Ok(task) = self.ready_queue.recv() { count += 1; println!("received task. {}", count); // Future를 검색합니다. 완료되지 않았으면(여전히 Some) `poll`하고 완료하려고 시도합니다. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // 작업 자체를 기반으로 `LocalWaker`를 만듭니다. let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>`는 `Pin<Box<dyn Future<Output = T> + Send + 'static>>`의 별칭입니다. // `as_mut`은 이것을 `Pin<&mut dyn Future + Send + 'static>`으로 변환합니다. if future.as_mut().poll(context).is_pending() { println!("executor run the future task, but is not ready, create a future again."); // Future가 아직 완료되지 않았으므로 다시 놓고 다음 poll을 기다립니다. *future_slot = Some(future); } else { println!("executor run the future task, is ready. the future task is done."); } } } } } fn main() { let (executor, spawner) = new_executor_and_spawner(); // TimerFuture를 작업으로 래핑하고 실행을 위해 스케줄러에 디스패치합니다. spawner.spawn(async { println!("TimerFuture await"); // 타이머 Future를 만들고 완료될 때까지 기다립니다. TimerFuture::new(Duration::new(10, 0)).await; println!("TimerFuture Done"); }); // executor가 더 이상 작업이 제출되지 않는다는 것을 알 수 있도록 spawner를 삭제합니다. drop(spawner); // 작업 큐가 비워질 때까지 executor를 실행합니다. // 작업이 실행되면 "howdy!"를 출력하고 2초 동안 일시 중지한 다음 "done!"을 출력합니다. executor.run(); }
실행 결과:
first dispatch the future task to executor. received task. 1 TimerFuture await future not ready, tell the future task how to wakeup to executor executor run the future task, but is not ready, create a future again. detect future is ready, wakeup the future task to executor. received task. 2 future ready. execute poll to return. TimerFuture Done executor run the future task, is ready. the future task is done.
첫 번째 스케줄링 시도에서 작업은 아직 준비되지 않았고 Pending
을 반환합니다. 그런 다음 준비가 되면 어떻게 깨어나야 하는지 작업에 알려줍니다. 나중에 이벤트가 준비되면 작업이 지시된 대로 깨어나고 실행되도록 예약됩니다.
비동기 처리 흐름
Reactor 패턴은 고성능 이벤트 기반 시스템을 구축하는 데 사용되는 고전적인 디자인 패턴입니다. executor와 reactor는 이 패턴의 구성 요소입니다. Reactor 패턴은 세 가지 주요 부분으로 구성됩니다.
- 작업: 실행할 작업 단위입니다. 작업은 일시 중지하고 executor에 제어를 양도하여 나중에 다시 예약될 때까지 기다릴 수 있습니다.
- Executor: 실행할 준비가 된 작업(ready queue)과 차단된 작업(wait queue)을 유지 관리하는 스케줄러입니다.
- Reactor: 이벤트 큐를 유지 관리합니다. 이벤트가 발생하면 executor에 특정 작업을 깨워서 실행하도록 알립니다.
executor는 작업을 실행하도록 예약합니다. 작업이 진행할 수 없지만 아직 완료되지 않은 경우 일시 중단되고 적절한 깨우기 조건이 등록됩니다. 나중에 reactor가 깨우기 조건을 충족하는 이벤트를 수신하면 일시 중단된 작업을 깨웁니다. 그런 다음 executor는 이 작업의 polling을 다시 시작할 수 있습니다. 이 주기는 작업이 완료될 때까지 반복됩니다.
Future
를 통한 Rust의 비동기 처리는 Reactor 패턴의 일반적인 구현입니다.
tokio
를 예로 들면:async/await
는 구문 수준 지원을 제공하고Future
는 비동기 작업을 나타내는 데이터 구조입니다..await
가 호출되면 executor는 작업을 예약하고 실행합니다.
Tokio의 스케줄러는 여러 스레드에서 실행됩니다. 각 스레드는 자체 _ready queue_에서 작업을 실행합니다. 스레드의 큐가 비어 있으면 다른 스레드의 큐에서 작업을 훔칠 수 있습니다(작업 훔치기라는 전략). 작업이 더 이상 진행할 수 없고
Poll::Pending
을 반환하면 스케줄러는 작업을 일시 중단하고Waker
를 사용하여 적절한 깨우기 조건을 설정합니다. reactor는 OS의 비동기 I/O 메커니즘(epoll
,kqueue
또는IOCP
와 같은)을 사용하여 I/O 이벤트를 모니터링합니다. 관련 이벤트가 트리거되면 reactor는Waker::wake()
를 호출하여 일시 중단된Future
를 깨웁니다.Future
는 _ready queue_에 다시 배치되고 실행을 기다립니다.
요약
Future
는 Rust의 비동기 프로그래밍 모델의 핵심 추상화로, 미래의 어느 시점에 완료될 작업을 나타냅니다. Rust의 Future
는 lazy하며, 이를 구동하려면 executor가 필요합니다. 이 실행은 polling을 통해 구현됩니다.
- 현재 polling 주기에서
Future
가 완료되면Poll::Ready(result)
를 반환하여 최종 값을 제공합니다. Future
가 아직 완료되지 않았으면Poll::Pending()
을 반환합니다. 이 시점에서Future
는 일시 중단되고Waker
를 통해 깨우기 위해 외부 이벤트를 기다립니다.
Waker
는 executor에 재개해야 할 작업을 알리는 wake()
메서드를 제공합니다. wake()
가 호출되면 executor는 Waker
와 연결된 작업이 진행할 준비가 되었음을 알고 Future
를 다시 pollin합니다. 이 wake → poll → suspend 주기는 Future
가 최종적으로 완료될 때까지 계속됩니다.
각 비동기 작업은 일반적으로 세 단계를 거칩니다.
- Polling 단계: executor는
Future
에서 polling을 시작합니다. 더 이상 진행할 수 없는 지점(Poll::Pending
)에 도달하면 작업이 일시 중단되고 대기 단계로 들어갑니다. - 대기 단계: 이벤트 소스(reactor라고 함)는 이벤트를 기다리기 위해
Waker
를 등록합니다. 이벤트가 발생하면Waker
를 트리거하여 연결된Future
를 깨우고 깨우기 단계로 전환합니다. - 깨우기 단계: 이벤트가 발생하면 해당
Future
가Waker
에 의해 깨어납니다. executor는Future
를 다시 polling하도록 예약합니다. 작업은 완료되거나 다른Pending
지점에 도달할 때까지 진행됩니다. 이 주기는 작업이 완전히 완료될 때까지 반복됩니다.
Rust 프로젝트 호스팅을 위한 최고의 선택, Leapcell입니다.
Leapcell은 웹 호스팅, 비동기 작업 및 Redis를 위한 차세대 서버리스 플랫폼입니다.
다국어 지원
- Node.js, Python, Go 또는 Rust로 개발하십시오.
무료로 무제한 프로젝트 배포
- 사용량에 대해서만 지불하십시오. 요청도 없고 요금도 없습니다.
탁월한 비용 효율성
- 유휴 요금 없이 사용한 만큼만 지불하십시오.
- 예: $25는 평균 응답 시간 60ms에서 694만 건의 요청을 지원합니다.
간소화된 개발자 경험
- 간편한 설정을 위한 직관적인 UI
- 완전 자동화된 CI/CD 파이프라인 및 GitOps 통합
- 실행 가능한 통찰력을 위한 실시간 메트릭 및 로깅
손쉬운 확장성 및 고성능
- 손쉽게 높은 동시성을 처리할 수 있도록 자동 확장
- 운영 오버헤드 제로 - 구축에만 집중하십시오.
Documentation에서 더 자세히 알아보십시오!
X에서 팔로우하세요: @LeapcellHQ