Rust의 비동기 프로그래밍: Stream 트레이트와 디자인
Ethan Miller
Product Engineer · Leapcell

Stream
트레이트는 Future
트레이트와 유사합니다. Future
는 단일 항목의 상태 변화를 나타내는 반면, Stream
은 표준 라이브러리의 Iterator
트레이트와 유사하게 완료되기 전에 여러 값을 생성할 수 있습니다. 간단히 말해서, Stream
은 일련의 Futures
로 구성되며, Stream
이 완료될 때까지 각 Future
의 결과를 읽을 수 있습니다.
Stream 정의
Future
는 비동기 프로그래밍에서 가장 기본적인 개념입니다. Future
가 일회성 비동기 값을 나타낸다면, Stream
은 일련의 비동기 값을 나타냅니다. Future
는 1인 반면, Stream
은 0, 1 또는 N입니다. Stream
의 서명은 다음과 같습니다.
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Stream
의 개념은 동기 기본 요소의 Iterator
에 해당합니다. 그들의 서명이 얼마나 비슷한지 떠올려 보세요!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream은 연속적인 데이터 소스를 추상화하는 데 사용되며, 끝날 수도 있습니다(poll
이 None
을 반환할 때)
Stream
의 일반적인 예는 futures
크레이트의 메시지 채널에 있는 소비자 Receiver
입니다. 메시지가 Send
측에서 전송될 때마다 수신기는 Some(val)
값을 얻습니다. Send
측이 닫히고 (드롭) 채널에 더 이상 메시지가 없으면 None
을 받습니다.
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next`는 `Iterator::next`와 유사하지만 값을 반환하는 대신 // `Future<Output = Option<T>>`를 반환하므로 실제 값을 얻으려면 `.await`가 필요합니다. assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
Iterator와 Stream의 차이점
Iterator
는None
을 반환할 때까지next()
메서드를 반복적으로 호출하여 새 값을 얻을 수 있습니다.Iterator
는 블로킹됩니다.next()
에 대한 각 호출은 결과가 얻어질 때까지 CPU를 점유합니다. 대조적으로, 비동기Stream
은 비블로킹이며 기다리는 동안 CPU를 양보합니다.Stream
의poll_next()
메서드는Future
의poll()
메서드와 매우 유사하며, 그 기능은Iterator
의next()
메서드와 유사합니다. 그러나poll_next()
를 직접 호출하는 것은 매우 불편합니다. 왜냐하면Poll
상태를 수동으로 처리해야 하기 때문입니다. 이것은 매우 인체공학적이지 않습니다. 그렇기 때문에 Rust는Stream
에 대한 확장 트레이트인StreamExt
를 제공하며,Next
구조체에 의해 구현된Future
를 반환하는next()
메서드를 제공합니다. 이렇게 하면stream.next().await
를 사용하여 값을 직접 반복할 수 있습니다.
참고:
StreamExt
는 _Stream Extension_을 의미합니다. Rust에서는 최소한의 트레이트 정의(Stream
과 같은)를 하나의 파일에 유지하고 추가 API(StreamExt
와 같은)를 별도의 관련 파일에 넣는 것이 일반적인 관행입니다.
참고:
Future
와 달리Stream
트레이트는 아직 Rust의 핵심 라이브러리(std::core
)에 없습니다. 이것은futures-util
크레이트에 있으며,StreamExtensions
도 표준 라이브러리의 일부가 아닙니다. 이는 다른 라이브러리가 충돌하는 imports를 제공할 수 있음을 의미합니다. 예를 들어, Tokio는futures-util
과 별도로 자체StreamExt
를 제공합니다. 가능하다면 async/await에 가장 일반적으로 사용되는 크레이트인futures-util
을 사용하는 것이 좋습니다.
StreamExt
의 next()
메서드와 Next
구조체의 구현:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next`는 `Next` 구조체를 반환합니다. pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // Stream이 Unpin이면 Next도 Unpin입니다. impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // Next는 Future를 구현하며, 각 poll()은 기본적으로 poll_next()를 통해 스트림에서 폴링합니다. impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Stream 생성
futures
라이브러리는 다음과 같은 기본적인 Stream
을 생성하는 데 편리한 여러 가지 메서드를 제공합니다.
empty()
: 빈Stream
을 생성합니다.once()
: 단일 값을 포함하는Stream
을 생성합니다.pending()
: 값을 생성하지 않고 항상Poll::Pending
을 반환하는Stream
을 생성합니다.repeat()
: 동일한 값을 반복적으로 생성하는Stream
을 생성합니다.repeat_with()
: 클로저를 통해 지연 방식으로 값을 생성하는Stream
을 생성합니다.poll_fn()
:Poll
을 반환하는 클로저에서Stream
을 생성합니다.unfold()
: 초기 상태와Future
를 반환하는 클로저에서Stream
을 생성합니다.
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // 스트림을 반복합니다. while let Some(x) = st.next().await { println!("Got item: {}", x); } }
위의 코드에서 stream::iter
는 Stream
을 생성하고, 이는 filter
및 map
연산을 통해 전달됩니다. 마지막으로 스트림이 반복되고 결과 데이터가 인쇄됩니다.
async/await에 관심이 없고 스트림 동작에만 관심이 있다면, Stream::iter
는 테스트에 매우 유용합니다. 또 다른 흥미로운 메서드는 repeat_with
입니다. 예를 들어 다음과 같이 필요에 따라 지연 방식으로 값을 생성하기 위해 클로저를 전달할 수 있습니다.
use futures::stream::{self, StreamExt}; // 2의 0번째에서 3번째 거듭제곱: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Stream 구현
자신의 Stream
을 생성하려면 두 단계가 필요합니다.
- 먼저 스트림의 상태를 유지하기 위해
struct
를 정의합니다. - 그런 다음 해당
struct
에 대해Stream
트레이트를 구현합니다.
1부터 5까지 카운트하는 Counter
라는 스트림을 만들어 보겠습니다.
#![feature(async_stream)] // 먼저 struct: /// 1부터 5까지 세는 스트림 struct Counter { count: usize, } // 카운터가 1부터 시작하기를 원하므로 도우미로 `new()` 메서드를 추가해 보겠습니다. // 이것은 엄격하게 필요한 것은 아니지만 편리합니다. // `count`를 0부터 시작한다는 점에 유의하세요. 그 이유는 `poll_next()` 구현에서 분명해질 것입니다. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // 그런 다음 `Counter`에 대해 `Stream`을 구현합니다. impl Stream for Counter { // 카운트에 `usize`를 사용합니다. type Item = usize; // `poll_next()`는 유일하게 필요한 메서드입니다. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // 카운터를 증가시킵니다. 이것이 0부터 시작한 이유입니다. self.count += 1; // 카운트가 완료되었는지 확인합니다. if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Stream 트레이트
Rust에는 Stream
, TryStream
및 FusedStream
과 같은 스트림과 관련된 여러 트레이트가 있습니다.
-
Stream
은Iterator
와 매우 유사합니다. 그러나None
을 반환하면 스트림이 소진되었으며 더 이상 폴링해서는 안 됨을 나타냅니다.None
을 반환한 후 스트림을 계속 폴링하면 정의되지 않은 동작이 발생하고 예측할 수 없는 결과가 발생할 수 있습니다. -
TryStream
은Result<value, error>
항목을 생성하는 스트림을 위한 특수 트레이트입니다.TryStream
은 내부Result
를 쉽게 일치시키고 변환할 수 있는 기능을 제공합니다. 오류 처리 사례를 더 편리하게 처리할 수 있도록Result
항목을 생성하는 스트림을 위해 설계된 API로 생각할 수 있습니다. -
FusedStream
은 일반 스트림과 유사하지만 사용자가None
을 반환한 후 스트림이 실제로 소진되었는지 또는 다시 안전하게 폴링할 수 있는지 여부를 알 수 있는 기능을 추가합니다. 예를 들어 순환 버퍼를 지원하는 스트림을 생성하는 경우 스트림이 첫 번째 반복에서None
을 반환할 수 있지만FusedStream
을 사용하면 나중에 다시 폴링하여 버퍼에 대한 새 라운드 반복을 재개하는 것이 안전합니다.
반복 및 동시성
Iterator
트레이트와 마찬가지로 Stream
도 반복을 지원합니다. 예를 들어 map
, filter
, fold
, for_each
, skip
과 같은 메서드와 오류 인식에 상응하는 try_map
, try_filter
, try_fold
, try_for_each
등을 사용할 수 있습니다.
그러나 Iterator
와 달리 for
루프는 Stream
을 반복하는 데 직접 사용할 수 없습니다. 대신 while let
또는 loop
와 같은 명령형 스타일 루프를 사용하여 next
또는 try_next
를 명시적으로 반복적으로 호출할 수 있습니다. 예를 들어 다음 방법 중 하나로 스트림에서 읽을 수 있습니다.
// 반복 패턴 1 while let Some(value) = s.next().await {} // 반복 패턴 2 loop { match s.next().await { Some(value) => {} None => break; } }
스트림에서 값의 합계를 계산하는 예:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // 반복하기 전에 스트림을 핀하는 것을 잊지 마세요. pin_mut!(stream); let mut sum: usize = 0; // 스트림을 반복합니다. while let Some(item) = stream.next().await { sum = sum + item; } sum }
값을 한 번에 하나씩 처리하면 동시성의 이점을 놓칠 수 있으며 이는 비동기 프로그래밍의 목적을 무너뜨립니다. Stream
에서 여러 값을 동시에 처리하려면 for_each_concurrent
및 try_for_each_concurrent
를 사용할 수 있습니다.
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // `try_for_each_concurrent` 사용 stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
요약
Stream
은 Future
와 유사하지만 Future
는 단일 항목의 상태 변화를 나타내는 반면 Stream
은 완료 전에 여러 값을 생성할 수 있는 Iterator
와 더 유사하게 동작합니다. 또는 간단히 말하면: Stream
은 일련의 Futures
로 구성되며 완료될 때까지 Stream
에서 각 Future
의 결과를 검색할 수 있습니다. 즉, 비동기 반복기입니다.
Stream
의 poll_next
함수는 다음 세 가지 가능한 값 중 하나를 반환할 수 있습니다.
Poll::Pending
: 다음 값이 아직 준비되지 않았으며 여전히 기다려야 함을 나타냅니다.Poll::Ready(Some(val))
: 값이 준비되었고 성공적으로 반환되었음을 나타냅니다.poll_next
를 다시 호출하여 다음 값을 검색할 수 있습니다.Poll::Ready(None)
: 스트림이 끝났음을 나타내며poll_next
를 더 이상 호출해서는 안 됩니다.
Rust 프로젝트 호스팅을 위한 최고의 선택, Leapcell입니다.
Leapcell은 웹 호스팅, 비동기 작업 및 Redis를 위한 차세대 서버리스 플랫폼입니다.
다국어 지원
- Node.js, Python, Go 또는 Rust로 개발하십시오.
무료로 무제한 프로젝트 배포
- 사용량에 대해서만 지불 - 요청 없음, 요금 없음.
탁월한 비용 효율성
- 유휴 요금 없이 종량제로 지불합니다.
- 예: $25는 평균 응답 시간 60ms에서 694만 건의 요청을 지원합니다.
간소화된 개발자 경험
- 간편한 설정을 위한 직관적인 UI.
- 완전 자동화된 CI/CD 파이프라인 및 GitOps 통합.
- 실행 가능한 통찰력을 위한 실시간 메트릭 및 로깅.
간편한 확장성 및 고성능
- 고도의 동시성을 쉽게 처리하기 위한 자동 확장.
- 운영 오버헤드가 제로 - 빌드에만 집중하세요.
설명서에서 자세히 알아보세요!
X에서 팔로우하세요: @LeapcellHQ