Rust 웹 서비스에서 스트림을 사용한 롱 폴링 구현
James Reed
Infrastructure Engineer · Leapcell

소개
현대 웹 애플리케이션의 세계에서 사용자에게 실시간 업데이트를 제공하는 것은 무엇보다 중요합니다. 채팅 애플리케이션, 라이브 대시보드 또는 알림 시스템이든 사용자는 페이지를 계속 새로고침하지 않고도 최신 데이터를 기대합니다. 진정한 양방향 통신에는 종종 웹소켓이 사용되지만, 클라이언트가 주로 업데이트를 수신하는 더 간단한 시나리오나 레거시 인프라를 다룰 때는 과도할 수 있습니다. 이때 롱 폴링이 빛을 발합니다. 롱 폴링은 HTTP 요청을 통해 유사 실시간 경험을 제공하며, 특정 사용 사례에서 지속적으로 열린 웹소켓 연결보다 통합을 더 간단하고 종종 더 견고하게 만듭니다. 이 글에서는 Rust의 강력한 비동기 생태계와 스트림 기반 API를 활용하여 웹 서비스에서 견고하고 확장 가능한 롱 폴링 메커니즘을 구현하는 방법에 대해 자세히 알아봅니다.
핵심 개념 및 구현
Rust 구현으로 들어가기 전에 롱 폴링 및 Rust의 비동기 기능과 관련된 몇 가지 주요 용어를 명확히 해 보겠습니다.
롱 폴링 (Long Polling): 클라이언트가 서버에 HTTP 요청을 보내고, 서버가 새 데이터가 사용 가능해지거나 타임아웃이 발생할 때까지 의도적으로 해당 요청을 열어 두는 기법입니다. 데이터가 준비되면 서버가 응답하고 클라이언트는 즉시 다시 요청합니다. 이는 표준 HTTP를 통해 푸시와 유사한 메커니즘을 시뮬레이션합니다.
비동기 프로그래밍 (Rust): Rust의 async
/await
구문을 사용하면 I/O 작업(예: 네트워크 요청 또는 데이터베이스 쿼리)이 완료될 때까지 실행 스레드를 차단하지 않는 동시 코드를 작성할 수 있습니다. 이는 많은 동시 연결을 효율적으로 처리해야 하는 고성능 웹 서비스에 매우 중요합니다.
스트림 (Rust): futures
및 비동기 Rust의 맥락에서 Stream
은 Iterator
와 유사하지만 async
컨텍스트에 대한 비동기 값 시퀀스입니다. 모든 데이터가 즉시 있어야 하는 것이 아니라 데이터가 사용 가능해짐에 따라 처리할 수 있습니다. 이는 여러 데이터 청크를 보내거나 새 이벤트가 도착하는 동안 연결을 유지하려는 롱 폴링에 특히 유용합니다.
롱 폴링 원리
롱 폴링의 서버 측 구현은 일반적으로 다음 단계를 포함합니다.
- 클라이언트 요청: 클라이언트는 특정 엔드포인트(예:
/events
)로 HTTP GET 요청을 보냅니다. - 서버 요청 보류: 새 데이터가 즉시 사용 가능하지 않으면 서버는 응답하지 않습니다. 대신 클라이언트 요청(또는 그 표현)을 큐에 넣거나 이벤트 소스에 구독시킵니다.
- 이벤트 알림: 새 데이터나 이벤트가 발생하면 서버는 대기 중인 클라이언트 요청을 검색합니다.
- 서버 응답: 서버는 새 데이터가 포함된 HTTP 응답을 구성하여 클라이언트에 보냅니다.
- 클라이언트 재시도: 응답을 받은 후 클라이언트는 즉시 다른 롱 폴링 요청을 시작하여 향후 이벤트를 계속 수신합니다.
Rust에서 스트림을 사용한 롱 폴링 구현
Rust의 tokio
런타임과 axum
웹 프레임워크는 tokio::sync::broadcast
채널 및 futures::StreamExt
와 결합하여 스트림을 사용하는 견고한 롱 폴링 서비스를 구축할 수 있는 훌륭한 기반을 제공합니다.
사용자가 일반 "이벤트"를 구독하고 수신할 수 있는 간단한 이벤트 시스템을 고려해 보겠습니다.
use axum::* use futures::Stream; use serde::Deserialize; use std::* use tokio::* use tokio_stream::wrappers::BroadcastStream; /// 클라이언트에 보낼 수 있는 이벤트를 나타냅니다. #[derive(Debug, Clone, serde::Serialize)] struct MyEvent { id: u64, message: String, } /// 우리의 애플리케이션 상태, 브로드캐스트 송신자를 보유합니다. #[derive(Clone)] struct AppState { event_sender: broadcast::Sender<MyEvent>, event_counter: Arc<Mutex<u64>>, } #[tokio::main] async fn main() { let (event_sender, _receiver) = broadcast::channel(16); let app_state = AppState { event_sender: event_sender.clone(), event_counter: Arc::new(Mutex::new(0)), }; // 이벤트 생성을 시뮬레이션합니다 (예: 다른 서비스 또는 내부 로직에서). tokio::spawn(generate_dummy_events(event_sender.clone(), app_state.event_counter.clone())); let app = Router::new() .route("/events/long-poll", get(long_poll_handler)) .route("/events/trigger", get(trigger_event)) .with_state(app_state); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .unwrap(); println!("Listening on {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); } // 롱 폴링 엔드포인트 핸들러 async fn long_poll_handler( State(app_state): State<AppState>, ) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>> { // 새 이벤트 수신기를 가져옵니다. let mut rx = app_state.event_sender.subscribe(); // 브로드캐스트 채널에서 이벤트를 내보내는 스트림을 만듭니다. // 우리는 MyEvent를 axum::sse::Event로 변환합니다. let event_stream = BroadcastStream::new(rx) .map(|event_result| match event_result { Ok(event) => { let json_data = serde_json::to_string(&event).unwrap_or_default(); Ok(Event::default().event("message").data(json_data)) } // 클라이언트가 너무 느렸던 경우 `RecvError::Lagged`를 처리합니다. // 롱 폴링의 경우 일반적으로 닫고 클라이언트가 다시 연결하도록 합니다. Err(e) => { eprintln!("Broadcast receive error: {:?}", e); // 실제 애플리케이션에서는 여기서 오류 이벤트를 보내거나 // 스트림을 종료하여 클라이언트가 다시 연결하도록 강제할 수 있습니다. Err(Infallible) } }) .boxed(); // Sse 응답은 암묵적으로 연결을 열어 두는 것을 처리합니다. // 스트림에서 도착하는 대로 이벤트를 보냅니다. // 서버가 데이터를 보낼 것으로 예상됨을 나타내기 위해 짧은 기간을 추가합니다. // 그러나 핵심 롱 폴링 타임아웃 로직은 클라이언트 측입니다. Sse::new(event_stream) } #[derive(Deserialize)] struct TriggerParams { message: String, } async fn trigger_event( State(app_state): State<AppState>, Query(params): Query<TriggerParams>, ) -> String { let mut counter = app_state.event_counter.lock().unwrap(); *counter += 1; let new_event = MyEvent { id: *counter, message: params.message.clone(), }; app_state.event_sender.send(new_event.clone()).unwrap(); format!("Event triggered: {:?}", new_event) } // 이벤트를 생성하는 외부 시스템을 시뮬레이션합니다. async fn generate_dummy_events( sender: broadcast::Sender<MyEvent>, counter: Arc<Mutex<u64>>, ) { let mut interval = interval(Duration::from_millis(2000)); // 2초마다 loop { interval.tick().await; let mut count = counter.lock().unwrap(); *count += 1; let event = MyEvent { id: *count, message: format!("Automatic event {}", *count), }; println!("Sending dummy event: {:?}", event); if let Err(e) = sender.send(event) { eprintln!("Failed to send dummy event: {}", e); } } }
코드 설명:
AppState
:tokio::sync::broadcast::Sender
를 보유하며, 이는 다중 생산자, 다중 소비자 채널입니다. 이벤트가 이 송신자를 통해 전송되면 활성 수신자 모두 복사본을 받습니다. 여러 클라이언트에 이벤트를 분산시키는 데 이상적입니다.main
함수:AppState
를 설정하고, 이벤트 생성 시뮬레이션을 위한 백그라운드 작업(generate_dummy_events
)을 시작하며,axum
HTTP 서버를 초기화합니다.generate_dummy_events
: 주기적으로MyEvent
인스턴스를 브로드캐스트 채널을 통해 전송하는 간단한async
함수입니다. 이는 새 데이터가 사용 가능해지는 것을 시뮬레이션합니다.long_poll_handler
: 핵심 롱 폴링 엔드포인트입니다.event_sender
를 구독하여broadcast::Receiver
를 가져옵니다.tokio_stream::wrappers::BroadcastStream::new(rx)
는broadcast::Receiver
를futures::Stream
으로 변환합니다. 이를 통해 들어오는 이벤트를 비동기 시퀀스로 처리할 수 있습니다.- 그런 다음 각 수신된
MyEvent
를axum::response::sse::Event
로map
합니다. 이 예제에서는 스트림 특성과 브라우저 호환성을 위해 서버 전송 이벤트(SSE)를 사용하지만, 기본 스트림 개념은 단일 JSON 응답을 각 롱 폴링 요청에 대해 보내는 경우에도 적용됩니다. SSE는 연결을 열어 두고 단일 HTTP 연결을 통해 여러 이벤트를 푸시하는 것을 본질적으로 처리하므로 고빈도 롱 폴링과 잘 맞습니다. Sse::new(event_stream)
:axum
은Stream
의Result<Event, Infallible>
을 서버 전송 이벤트 응답으로 자동 변환하는Sse
응답 유형을 제공합니다.Infallible
오류 유형은 스트림 자체에서axum
이 특별히 처리해야 하는 오류를 생성하지 않음을 가정합니다. 이벤트가 처리될 수 없으면 일반적으로 건너뛰거나 오류를 기록합니다.
trigger_event
: HTTP 요청을 통해 이벤트를 수동으로 트리거하는 간단한 엔드포인트로, 테스트에 유용합니다.
클라이언트 측 고려 사항
클라이언트의 경우 표준 fetch
또는 XMLHttpRequest
를 사용할 수 있습니다. 서버에서 SSE를 사용하므로 클라이언트 측 EventSource
API가 가장 적합합니다.
const eventSource = new EventSource('http://127.0.0.1:3000/events/long-poll'); eventSource.onmessage = function(event) { console.log("Received general message:", event.data); const data = JSON.parse(event.data); console.log("Parsed event:", data); }; eventSource.addEventListener('message', function(event) { console.log("Received 'message' event:", event.data); }); eventSource.onerror = function(err) { console.error("EventSource failed:", err); // 일반적으로 여기서 재연결을 처리합니다. // EventSource에는 일부 자동 재연결 로직이 있지만 사용자 정의할 수 있습니다. };
이 클라이언트는 연결을 열어 두고 Rust 서버에서 보낸 대로 이벤트를 수신합니다. 연결이 끊어지면 EventSource
는 다시 연결을 시도합니다. 기존 롱 폴링(SSE를 사용하지 않는 경우)의 경우 클라이언트는 응답을 받을 때마다 새 fetch
요청을 만들어야 합니다.
애플리케이션 시나리오
- 알림 시스템: 모든 클라이언트에 대해 웹소켓 오버헤드 없이 사용자 알림을 푸시합니다.
- 라이브 대시보드/피드: 클라이언트가 주로 정보를 소비하는 실시간 데이터 업데이트(예: 주가, 센서 판독값)를 표시합니다.
- 채팅 애플리케이션 (단순화): 메시지를 서버로 보내고 참가자에게 분산하는 기본 채팅의 경우 롱 폴링은 웹소켓의 간단한 대안이 될 수 있으며, 특히 서버가 업데이트를 푸시하기만 하면 됩니다.
- 게임 로비: 플레이어가 새 게임 시작이나 참가자 도착에 대해 알립니다.
결론
비동기 스트림을 사용하는 Rust 웹 서비스에서 롱 폴링을 구현하는 것은 유사 실시간 업데이트를 제공하는 견고하고 효율적인 방법입니다. tokio
의 브로드캐스트 채널과 axum
의 스트림 호환 Sse
응답을 활용함으로써 개발자는 스레드를 차단하지 않고 수많은 동시 연결을 처리하는 확장 가능한 시스템을 구축할 수 있습니다. 이 접근 방식은 양방향 통신이 엄격하게 요구되지 않는 시나리오에 대한 웹소켓의 강력한 대안을 제공하며, 더 응답성이 뛰어나고 상호작용적인 사용자 경험에 기여합니다. Rust의 성능 및 동시성 기능은 이러한 실시간 통신 패턴을 아키텍처링하는 데 매우 적합합니다.