async-graphql 구독을 사용한 실시간 데이터
Emily Parker
Product Engineer · Leapcell

소개: 정적 데이터와 동적 사용자 경험 연결
오늘날 빠르게 변화하는 디지털 세계에서 사용자는 즉각적인 업데이트와 반응형 인터페이스를 기대합니다. 정적 데이터를 가져오는 데는 효과적이지만, 요청-응답 패러다임은 라이브 스포츠 점수, 채팅 메시지 또는 주식 가격 변동과 같은 실시간 정보를 제공하는 데에는 종종 부족합니다. 서버를 지속적으로 폴링하는 것은 비효율적이고 리소스 집약적일 수 있으며, 대역폭 낭비와 서버 부하 증가로 이어집니다. 이곳이야말로 실시간 기술이 빛을 발하는 곳으로, 서버가 이벤트 발생 시 연결된 클라이언트에 적극적으로 데이터를 푸시할 수 있도록 합니다. 강력한 쿼리 언어를 갖춘 GraphQL은 데이터 가져오기에 대한 우아한 솔루션을 제공하지만, 구독을 통해 진정한 실시간 잠재력이 발휘됩니다. 이 글에서는 Rust에서 async-graphql
을 활용하여 GraphQL 구독을 구현하는 방법을 살펴보고, 애플리케이션을 정적 데이터 제공업체에서 동적이고 실시간으로 강력한 애플리케이션으로 변환합니다.
GraphQL 구독을 사용한 실시간 이해
코드를 자세히 살펴보기 전에, GraphQL 및 async-graphql
을 사용한 실시간 데이터 업데이트의 기초가 되는 몇 가지 기본 개념을 명확히 해보겠습니다.
핵심 개념
- GraphQL: API를 위한 오픈 소스 데이터 쿼리 및 조작 언어이며, 기존 데이터로 쿼리를 이행하는 런타임입니다. 클라이언트가 필요한 것을 정확히, 더 많지도 적지도 않게 요청할 수 있도록 합니다.
- 구독: 특정 이벤트가 발생할 때 클라이언트가 서버로부터 실시간 업데이트를 받을 수 있도록 하는 GraphQL 작업 유형입니다. 쿼리(한 번 가져오기) 및 뮤테이션(한 번 수정 및 가져오기)과 달리 구독은 열린 연결(일반적으로 WebSocket 기반)을 유지하고 서버에서 새 이벤트가 게시될 때 클라이언트에 데이터를 푸시합니다.
- WebSockets: 단일 TCP 연결을 통해 전이중(full-duplex) 통신 채널을 제공하는 통신 프로토콜입니다. 이 영구 연결은 구독에 중요하며, 클라이언트가 반복적으로 데이터를 요청할 필요 없이 서버가 업데이트를 푸시할 수 있도록 합니다.
async-graphql
: Rust를 위한 강력하고 직관적인 GraphQL 서버 라이브러리입니다. 구독을 포함한 모든 GraphQL 작업 유형을 지원하며 다양한 비동기 런타임 및 웹 프레임워크와 원활하게 통합됩니다.- 게시-구독(Pub/Sub) 패턴: 발신자(게시자)가 메시지를 특정 수신자(구독자)에게 직접 보내지 않고, 대신 게시된 메시지를 클래스로 분류하는 메시징 패턴입니다. 구독자는 하나 이상의 클래스에 대한 관심을 표현하고 관심 있는 메시지만 받습니다. 이 패턴은 전역 이벤트가 구독 업데이트를 트리거하는 방식의 기본입니다.
GraphQL 구독 메커니즘
기본적으로 GraphQL 구독은 다음과 같이 작동합니다.
- 클라이언트가 구독 시작: 클라이언트는 일반적으로 WebSocket 연결을 통해 서버에 GraphQL 구독 쿼리를 보냅니다.
- 서버가 이벤트 수신 대기: 서버는 구독을 받은 후 클라이언트의 특정 이벤트 스트림 또는 주제에 대한 관심을 등록합니다. 그런 다음 이러한 이벤트를 수신할 메커니즘을 설정합니다.
- 이벤트 발생 및 게시: 서버 측에서 이벤트가 발생하면(예: 새 채팅 메시지 전송, 데이터베이스 레코드 업데이트), 서버는 이 이벤트를 Pub/Sub 시스템(예: 메모리 내 채널, Redis Pub/Sub, Kafka)에 게시합니다.
- 서버가 구독자에게 알림: Pub/Sub 시스템 또는 이를 수신 대기하는 구성 요소가 게시된 이벤트를 가져옵니다. 그런 다음 GraphQL 서버는 클라이언트의 원래 구독 쿼리에 따라 이 이벤트 데이터를 변환하고 결과 페이로드를 기존 WebSocket 연결을 통해 클라이언트에 다시 푸시합니다.
- 클라이언트가 업데이트 수신: 클라이언트는 실시간 업데이트를 받고 UI 또는 상태를 업데이트하는 등 적절하게 반응할 수 있습니다.
async-graphql
을 사용한 구독 구현
새 메시지가 구독자 클라이언트에 실시간으로 푸시되는 간단한 채팅 애플리케이션을 구현하는 예를 살펴보겠습니다.
먼저 Cargo.toml
에서 종속성을 설정해야 합니다.
[dependencies] async-graphql = { version = "7.0", features = ["apollo_tracing", "tracing"] } async-graphql-warp = "7.0" # 또는 async-graphql-actix-web, async-graphql-poem 등. tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" futures = "0.3" warp = "0.3" # 또는 actix-web, poem 등.
다음으로 구독 유형과 함께 GraphQL 스키마를 정의해 보겠습니다. 단순화를 위해 Pub/Sub 시스템 시뮬레이션을 위해 메모리 내 채널을 사용합니다. 프로덕션 환경에서는 Redis 또는 유사한 것을 고려하십시오.
use async_graphql::{ http::{GraphiQLSource, WebSocket}, Subscription, Schema, Object, futures_util::stream::{SplitSink, SplitStream}, futures_util::{Stream, SinkExt, StreamExt}, Context, EmptyMutation, }; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use futures::channel::mpsc; use std::{pin::Pin, collections::HashMap, sync::Arc, sync::Mutex}; use warp::{Filter, Rejection, Reply}; // 간단한 Message 구조체 #[derive(Clone, Debug)] struct Message { id: u32, content: String, author: String, } // 우리의 Pub/Sub 시스템: 브로드캐스트 채널 // 스레드 간 공유 및 수정을 위해 sender를 Arc<Mutex<_>>에 저장 type MessageSender = Arc<broadcast::Sender<Message>>; // sender를 보유할 우리의 GraphQL 컨텍스트 struct AppContext { message_sender: MessageSender, // 단순화를 위해 메시지를 메모리에 저장합니다. messages: Arc<Mutex<Vec<Message>>>, next_id: Arc<Mutex<u32>>, } // 메시지를 추가하는 메서드 구현 impl AppContext { fn add_message(&self, content: String, author: String) -> Message { let mut messages = self.messages.lock().unwrap(); let mut next_id = self.next_id.lock().unwrap(); let id = *next_id; *next_id += 1; let new_message = Message { id, content, author }; messages.push(new_message.clone()); // 새 메시지를 브로드캐스트 채널에 게시합니다. if let Err(e) = self.message_sender.send(new_message.clone()) { eprintln!("Failed to send message: {:?}", e); } new_message } } // Query 유형: 기존 메시지 또는 일반 데이터 가져오기 struct Query; #[Object] impl Query { async fn hello(&self) -> String { "world".to_string() } async fn messages(&self, ctx: &Context<'_>) -> Vec<Message> { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.messages.lock().unwrap().clone() } } // Subscription 유형: 클라이언트가 구독할 수 있는 것을 정의합니다. struct Subscription; #[Subscription] impl Subscription { // 이 구독은 새 메시지를 스트리밍합니다. async fn new_messages<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream<Item = Message> + 'ctx { let app_ctx = ctx.data::<AppContext>().unwrap(); let receiver = app_ctx.message_sender.subscribe(); BroadcastStream::new(receiver) .map(|result| result.unwrap_or_else(|e| { eprintln!("Error receiving message from broadcast channel: {:?}", e); // 실제 앱에서는 이 오류를 더 정상적으로 처리하거나 // 항목을 건너뛰거나 사용자 지정 오류 메시지를 보낼 수 있습니다. Message { id: 0, content: "Error".to_string(), author: "System".to_string() } })) } } // Mutation 유형: 새 메시지 게시 struct Mutation; #[Object] impl Mutation { async fn send_message(&self, ctx: &Context<'_>, content: String, author: String) -> Message { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.add_message(content, author) } } // GraphQL 스키마를 빌드하는 함수 fn build_schema() -> Schema<Query, Mutation, Subscription> { let (tx, _rx) = broadcast::channel(1024); // 1024개 메시지 버퍼 let message_sender = Arc::new(tx); let app_ctx = AppContext { message_sender: message_sender.clone(), messages: Arc::new(Mutex::new(Vec::new())), next_id: Arc::new(Mutex::new(1)), }; Schema::build(Query, Mutation, Subscription) .data(app_ctx) .finish() } #[tokio::main] async fn main() { let schema = build_schema(); // GraphQL 엔드포인트 정의 let graphql_post = async_graphql_warp::graphql(schema.clone()) .and_then(|(schema, request)| async move { Ok::<_, Rejection>(async_graphql_warp::Response::from(schema.execute(request).await)) }); // GraphQL 구독 엔드포인트 정의 // Warp의 `async_graphql_warp::graphql_subscription` 사용 let graphql_ws = async_graphql_warp::graphql_subscription(schema); // GraphiQL IDE 엔드포인트 정의 let graphiql = warp::path!("graphiql") .and(warp::get()) .map(|| { warp::http::Response::builder() .header("content-type", "text/html") .body(GraphiQLSource::build().endpoint("/").subscription_endpoint("/").finish()) .unwrap() }); let routes = graphql_post .or(graphql_ws) .or(graphiql); println!("GraphiQL IDE: http://localhost:8000/graphiql"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; }
코드 예제 설명
Message
구조체: 채팅 메시지를 나타내는 간단한Message
구조체입니다.tokio::sync::broadcast
는 메시지가 복제 가능해야 하므로Clone
이어야 합니다.MessageSender
(Pub/Sub): 메모리 내 Pub/Sub 메커니즘으로tokio::sync::broadcast::channel
을 사용합니다. 메시지가sender.send(message)
를 통해 전송되면 모든 활성 구독자가 메시지 복제본을 받습니다.AppContext
: GraphQL 컨텍스트 역할을 하며,MessageSender
및 모든 메시지(쿼리용)를 저장하는 벡터와 같은 공유 상태를 보유합니다.messages
및next_id
를Arc<Mutex<T>>
로 래핑하여 스레드 간에 안전하게 공유되는 변경 가능한 액세스를 보장합니다.Query
유형: 모든 기록 메시지를 가져오는messages
필드와 간단한hello
필드를 정의합니다.Mutation
유형:send_message
뮤테이션은 새Message
를 만들고, 메모리 내 저장소에 추가하고, 중요한 것은self.message_sender.send(new_message)
를 호출하여 게시합니다.Subscription
유형:#[Subscription]
속성은 이를 GraphQL 구독 유형으로 표시합니다.new_messages
필드는impl Stream<Item = Message>
을 반환하는 비동기 함수입니다. 이것이 구독의 핵심입니다.new_messages
내부에서app_ctx.message_sender.subscribe()
를 통해message_sender
에서 수신자를 가져옵니다.BroadcastStream::new(receiver)
는tokio::sync::broadcast::Receiver
를futures::Stream
으로 변환하며, 이는async-graphql
이 이해합니다.- 브로드캐스트 채널의 잠재적 오류를 처리하고 결과를 언래핑하기 위해
.map
을 추가합니다.
main
함수 설정:Schema
를 빌드하고.data(app_ctx)
를 사용하여AppContext
를 제공합니다. 이렇게 하면ctx.data::<AppContext>()
를 통해 모든 resolver에서AppContext
에 액세스할 수 있습니다.- 세 개의 Warp 라우트를 설정합니다.
/
:async_graphql_warp::graphql()
을 사용하여 표준 GraphQL 쿼리 및 뮤테이션을 처리합니다./
:async_graphql_warp::graphql_subscription()
을 사용하여 GraphQL을 WebSocket을 통한 GraphQL로 처리합니다. HTTP와 WebSocket 모두 동일한 엔드포인트 경로를 사용하는 것이 일반적이며, 다른 프로토콜을 통신하기 때문입니다./graphiql
: 메인 GraphQL 엔드포인트 및 구독 엔드포인트와 상호 작용하도록 구성된 GraphiQL IDE 엔드포인트를 제공합니다.
구독 테스트
- 애플리케이션 실행:
cargo run
- 브라우저에서
http://localhost:8000/graphiql
을 엽니다. - GraphiQL IDE에서 두 개의 탭 또는 창을 엽니다.
탭 1 (구독):
subscription NewMessages { newMessages { id content author } }
이 구독을 실행합니다. WebSocket 연결을 설정하고 수신 대기를 시작합니다.
탭 2 (뮤테이션):
mutation SendMessage { sendMessage(content: "Hello from GraphQL!", author: "Alice") { id content author } }
이 뮤테이션을 여러 번 실행하고 내용과 작성자를 변경합니다. 탭 1의 구독 출력에서 새 메시지가 즉시 나타나는 것을 볼 수 있습니다.
세 번째 탭에서 기록 메시지를 쿼리해 볼 수도 있습니다.
query GetMessages { messages { id content author } }
애플리케이션 시나리오
GraphQL 구독은 다양한 실시간 애플리케이션에 이상적입니다.
- 채팅 애플리케이션: 인스턴트 메시징, 그룹 채팅.
- 라이브 대시보드: 실시간 메트릭, 시스템 모니터링.
- 협업 편집: 변경 사항이 즉시 반영되는 Google Docs와 같은 애플리케이션.
- 게임: 게임 상태, 플레이어 위치 업데이트.
- 금융 애플리케이션: 라이브 주식 시세, 암호화폐 가격.
- 알림: 사용자 알림 즉시 푸시.
결론: 동적 사용자 경험 강화
Rust에서 async-graphql
을 사용한 GraphQL 구독 구현은 애플리케이션에 실시간 데이터 기능을 구축하는 강력하고 효율적인 방법을 제공합니다. WebSocket 프로토콜과 게시-구독 패턴을 활용하여 async-graphql
은 이벤트가 전개됨에 따라 서버가 클라이언트에 데이터를 능동적으로 푸시할 수 있도록 하여, 정적 데이터 표현을 동적이고 살아있는 인터페이스로 변환함으로써 사용자 경험을 크게 향상시킵니다. Rust의 성능과 async-graphql
의 편의성을 결합하면 개발자는 비교적 쉽게 고도로 반응적이고 상호 작용적인 애플리케이션을 만들 수 있습니다.