수천 개의 WebSocket 연결 관리: 액터 모델 대 Mutex<HashMap>
Olivia Novak
Dev Intern · Leapcell

소개
실시간 웹 애플리케이션의 영역에서 WebSocket 연결은 필수 불가결해졌습니다. 채팅 애플리케이션, 협업 도구부터 라이브 대시보드 및 게임에 이르기까지 클라이언트와 서버 간의 지속적이고 낮은 지연 시간의 통신을 유지하는 능력은 무엇보다 중요합니다. 이러한 애플리케이션이 확장됨에 따라 아마도 수천, 심지어 수만 개의 동시 WebSocket 연결과 관련된 상태를 관리하는 것은 상당한 아키텍처적 과제를 제시합니다. 데이터 일관성, 높은 처리량 및 내결함성을 보장하려면 동시성 모델과 데이터 구조에 대한 신중한 고려가 필요합니다. 이 글에서는 Rust 생태계에서 이 과제를 해결하기 위한 두 가지 주요 접근 방식, 즉 액터 모델과 보다 전통적인 Mutex<HashMap>을 탐구하고, 그 원리, 구현 및 방대한 수의 연결 관리에 대한 실제적인 함의를 분석합니다.
확장 가능한 연결 관리를 위한 핵심 개념
두 가지 주요 접근 방식을 자세히 살펴보기 전에 Rust에서 동시 WebSocket 연결을 효과적으로 관리하는 데 중요한 핵심 개념에 대한 공통된 이해를 구축해 보겠습니다.
WebSocket 연결 상태
각 활성 WebSocket 연결에는 종종 사용자 ID, 세션 정보, 다양한 주제에 대한 구독 세부 정보 또는 메시지를 클라이언트로 다시 보내기 위한 채널의 Sender 부분과 같은 관련 데이터가 있습니다. 이 "상태"는 메시지가 도착하거나 이벤트가 발생할 때 애플리케이션의 다른 부분에서 접근하고 수정할 수 있어야 합니다.
동시성과 병렬성
Rust의 소유권 및 차용 시스템은 컴파일 시간 데이터 경합을 방지하는 강력한 도구입니다. 그러나 (WebSocket 서버에서 일반적인) 여러 비동기 작업 간에 공유되는 가변 상태를 다룰 때는 신중한 패턴이 필요합니다.
- 동시성: 잠재적으로 단일 코어에서 시간 경과에 따라 여러 작업을 교차하여 처리합니다. 이것은 Rust의
async/await에 일반적입니다. - 병렬성: 일반적으로 여러 CPU 코어에서 여러 작업을 동시에 실행합니다.
비동기 프로그래밍 (async/await)
Rust의 async/await 구문은 네트워크 통신과 같은 I/O 바운드 작업에 중요한 비차단 코드를 작성하는 방법을 제공합니다. 단일 스레드는 I/O 작업 중에 제어를 양보함으로써 많은 WebSocket 연결을 동시에 관리하여 다른 작업이 실행되도록 할 수 있습니다.
메시지 전달
작업이 직접적인 가변 메모리 공유 대신 서로 데이터를 보내 서로 통신하는 기본 동시성 프리미티브입니다. 이는 종종 채널(예: flume, tokio::mpsc)을 포함합니다.
공유 가변 상태
여러 작업이 동일한 데이터 조각에 액세스하고 잠재적으로 수정해야 하는 경우 이는 공유 가변 상태가 됩니다. Rust는 주로 공유 소유권을 위한 Arc(Atomic Reference Counted)와 독점 액세스를 위한 Mutex와 같은 동기화 프리미티브를 사용하여 이를 안전하게 관리할 수 있는 여러 메커니즘을 제공합니다.
연결 관리: 액터 모델
액터 모델은 "액터"가 보편적인 프리미티브인 동시 계산을 위한 강력한 패러다임입니다. 각 액터는 자체 상태, 동작 및 사서함을 가진 독립적인 계산 엔터티입니다. 액터는 서로의 사서함으로 불변 메시지를 보내는 것을 통해서만 통신합니다. 액터는 한 번에 하나씩 메시지를 처리하여 여러 송신자가 내부 상태에 동시에 액세스할 수 없도록 하여 설계상 데이터 경합을 제거합니다.
원리
WebSocket 연결의 맥락에서 액터 모델 접근 방식은 일반적으로 다음을 포함합니다.
- 연결 액터: 각 WebSocket 연결은 이론적으로 자체 상태를 관리하고 메시지를 보내고 받는 액터가 될 수 있습니다. 그러나 수천 개의 연결의 경우 연결당 완전한 액터를 만드는 것은 너무 많은 오버헤드를 초래할 수 있습니다.
- 연결 관리자 액터: 더 일반적이고 확장 가능한 접근 방식은 모든 활성 연결의 상태를 소유하고 관리하는 단일 "연결 관리자" 액터(또는 소수의 "샤드" 액터)를 갖는 것입니다. WebSocket 클라이언트가 메시지를 보내면 연결 관리자 액터로 전달됩니다. 서버가 특정 클라이언트에 메시지를 보내야 하는 경우 연결 관리자 액터에 메시지를 보내고, 이 액터는 클라이언트의 송신 채널을 조회하여 메시지를 디스패치합니다.
tokio::mpsc를 사용한 구현 예시
tokio::mpsc 채널을 사용한 간단한 예시로 설명해 보겠습니다.
use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::{accept_async, tungstenite::Message}; use futures_util::{StreamExt, SinkExt}; use std::collections::HashMap; use std::sync::Arc; // --- Connection Manager Actor를 위한 메시지 --- #[derive(Debug)] enum ConnectionManagerMessage { Registro { id: u32, sender: mpsc::Sender<Message> }, Desregistrar { id: u32 }, // 예: 모든 대상 브로드캐스트 또는 특정 대상 보내기 Broadcast { msg: String }, SendToClient { id: u32, msg: String }, } // --- Connection Manager Actor --- struct ConnectionManagerActor { connections: HashMap<u32, mpsc::Sender<Message>>, next_client_id: u32, } impl ConnectionManagerActor { fn new() -> Self { ConnectionManagerActor { connections: HashMap::new(), next_client_id: 0, } } async fn run(mut self, mut receiver: mpsc::Receiver<ConnectionManagerMessage>) { while let Some(msg) = receiver.recv().await { match msg { ConnectionManagerMessage::Register { id, sender } => { self.connections.insert(id, sender); println!("Client {} registered. Total: {}", id, self.connections.len()); }, ConnectionManagerMessage::Unregister { id } => { self.connections.remove(&id); println!("Client {} unregistered. Total: {}", id, self.connections.len()); }, ConnectionManagerMessage::Broadcast { msg } => { for (_id, sender) in &self.connections { let _ = sender.send(Message::text(msg.clone())).await; } }, ConnectionManagerMessage::SendToClient { id, msg } => { if let Some(sender) = self.connections.get(&id) { let _ = sender.send(Message::text(msg)).await; } else { eprintln!("Client {} not found for targeted message.", id); } }, } } } fn generate_id(&mut self) -> u32 { let id = self.next_client_id; self.next_client_id += 1; id } } // --- WebSocket Handler Task --- async fn handle_connection( raw_stream: TcpStream, manager_sender: mpsc::Sender<ConnectionManagerMessage>, client_id: u32, ) { let ws_stream = match accept_async(raw_stream).await { Ok(ws) => ws, Err(e) => { eprintln!("Error during WebSocket handshake: {}", e); return; } }; let (mut ws_sender, mut ws_receiver) = ws_stream.split(); let (tx, mut rx) = mpsc::channel::<Message>(100); // Channel for sending messages to this specific client // Register client with the manager let _ = manager_sender.send(ConnectionManagerMessage::Register { id: client_id, sender: tx }).await; // Task to send messages from manager to client let send_to_client_task = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Err(e) = ws_sender.send(msg).await { eprintln!("Error sending message to client {}: {}", client_id, e); break; } } }); // Task to receive messages from client and forward to manager (or process directly) while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Text(text)) => { println!("Received from client {}: {}", client_id, text); // Example: client sends a broadcast request if text == "broadcast" { let _ = manager_sender.send(ConnectionManagerMessage::Broadcast { msg: format!("Hello from client {}", client_id) }).await; } else { // Or simply echo back let _ = manager_sender.send(ConnectionManagerMessage::SendToClient { id: client_id, msg: format!("Echo: {}", text) }).await; } }, Ok(Message::Ping(_)) => { let _ = ws_sender.send(Message::Pong(vec![])).await; }, Ok(Message::Close(_)) => { println!("Client {} disconnected.", client_id); break; }, Err(e) => { eprintln!("Error receiving from client {}: {}", client_id, e); break; }, _ => {} // Ignore other message types for simplicity } } println!("Client handler for {} shutting down.", client_id); let _ = manager_sender.send(ConnectionManagerMessage::Unregister { id: client_id }).await; send_to_client_task.abort(); // Stop the sender task } #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:8080").await.expect("Can't listen"); println!("Listening on: 127.0.0.1:8080"); let (manager_sender, manager_receiver) = mpsc::channel::<ConnectionManagerMessage>(1000); // Channel for manager messages let mut manager_actor = ConnectionManagerActor::new(); let manager_sender_clone = manager_sender.clone(); // Clone for the main loop // Spawn the Connection Manager Actor tokio::spawn(async move { manager_actor.run(manager_receiver).await; }); loop { let (stream, _) = listener.accept().await.expect("failed to accept"); let client_id = { let mut guard = manager_actor.next_client_id; // Temporary access for ID generation, careful here // In a real actor model, ID generation would be a message to the manager // For simplicity, we'll assume manager_actor is mutable here. // A better way would be the manager sending a message BACK with the assigned ID. let id = guard; guard += 1; id }; tokio::spawn(handle_connection(stream, manager_sender_clone.clone(), client_id)); } }
(참고: main의 ID 생성은 액터 모델 예시에서 단순화되었습니다. 순수한 액터 모델에서는 ID 생성조차 액터에 대한 메시지이거나 액터가 Register 시 ID를 할당하고 클라이언트 핸들러로 다시 보내는 것이 좋습니다.)
애플리케이션 시나리오
액터 모델은 특히 다음과 같은 경우에 적합합니다.
- 복잡한 상태 전환: 연결 상태가 다양한 수신 메시지에 따라 상당히 변경될 수 있는 경우.
- 서비스 검색/라우팅: 액터는 어떤 클라이언트가 어떤 주제를 구독하고 있는지 관리하고 그에 따라 메시지를 라우팅할 수 있습니다.
- 분리된 구성 요소: 기본적으로 느슨한 결합을 촉진하여 시스템을 더 쉽게 이해하고 테스트할 수 있습니다.
연결 관리: Mutex<HashMap>
Mutex<HashMap> 접근 방식은 Rust에서 공유 상태를 관리하는 더 직접적인 방법입니다. 이는 tokio::sync::Mutex(또는 async 컨텍스트가 아닌 경우 std::sync::Mutex)로 보호되는 HashMap(키는 연결 ID이고 값은 송신자 부분 또는 전체 연결 개체일 수 있음)을 래핑하고 일반적으로 Arc로 래핑하여 작업 간에 공유 소유권을 갖도록 하는 것을 포함합니다.
원리
작업이 공유 연결 상태에 액세스하거나 수정해야 할 때:
Mutex에 대한 잠금을 획득합니다. 이는 잠금이 해제될 때까지 다른 작업이 잠금을 획득하는 것을 차단하여 독점 액세스를 보장합니다.HashMap에서 필요한 작업을 수행합니다.- 잠금을 해제합니다.
이 메커니즘은 공유 데이터에 대한 액세스를 직렬화하여 명시적으로 데이터 경합을 방지합니다.
Arc<Mutex<HashMap>>를 사용한 구현 예시
use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::{accept_async, tungstenite::Message}; use futures_util::{StreamExt, SinkExt}; use std::collections::HashMap; use std::sync::Arc; // 모든 연결에 대한 공유 상태 struct SharedState { connections: Mutex<HashMap<u32, mpsc::Sender<Message>>>, next_client_id: Mutex<u32>, } impl SharedState { fn new() -> Self { SharedState { connections: Mutex::new(HashMap::new()), next_client_id: Mutex::new(0), } } } // --- WebSocket Handler Task --- async fn handle_connection_mutex( raw_stream: TcpStream, state: Arc<SharedState>, ) { let ws_stream = match accept_async(raw_stream).await { Ok(ws) => ws, Err(e) => { eprintln!("Error during WebSocket handshake: {}", e); return; } }; let (mut ws_sender, mut ws_receiver) = ws_stream.split(); let (tx, mut rx) = mpsc::channel::<Message>(100); // Channel for sending messages to this specific client let client_id = { let mut next_id = state.next_client_id.lock().await; let id = *next_id; *next_id += 1; id }; println!("New client connected, ID: {}", client_id); // Register client with the shared state { let mut connections = state.connections.lock().await; connections.insert(client_id, tx); println!("Client {} registered (Mutex). Total: {}", client_id, connections.len()); } // Task to send messages from manager to client let send_to_client_task = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Err(e) = ws_sender.send(msg).await { eprintln!("Error sending message to client {}: {}", client_id, e); break; } } }); // Task to receive messages from client and process/forward while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Text(text)) => { println!("Received from client {} (Mutex): {}", client_id, text); // Example: Broadcast to all if text == "broadcast" { let connections = state.connections.lock().await; for (&id, sender) in connections.iter() { if id != client_id { // Don't send back to self for broadcast example let _ = sender.send(Message::text(format!("Broadcast from {}: {}", client_id, text))).await; } } } else { // Echo back to sender let connections = state.connections.lock().await; if let Some(sender) = connections.get(&client_id) { let _ = sender.send(Message::text(format!("Echo (Mutex): {}", text))).await; } } }, Ok(Message::Ping(_)) => { let _ = ws_sender.send(Message::Pong(vec![])).await; }, Ok(Message::Close(_)) => { println!("Client {} disconnected (Mutex).", client_id); break; }, Err(e) => { eprintln!("Error receiving from client {} (Mutex): {}", client_id, e); break; }, _ => {} // Ignore other message types for simplicity } } println!("Client handler for {} shutting down (Mutex).", client_id); // Unregister client from shared state { let mut connections = state.connections.lock().await; connections.remove(&client_id); println!("Client {} unregistered (Mutex). Total: {}", client_id, connections.len()); } send_to_client_task.abort(); // Stop the sender task } #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:8081").await.expect("Can't listen"); println!("Listening on: 127.0.0.1:8081 (Mutex)"); let shared_state = Arc::new(SharedState::new()); loop { let (stream, _) = listener.accept().await.expect("failed to accept"); tokio::spawn(handle_connection_mutex(stream, Arc::clone(&shared_state))); } }
애플리케이션 시나리오
Mutex<HashMap> 접근 방식은 종종 다음과 같은 경우에 선호됩니다.
- 단순성이 핵심일 때: 공유 상태가 비교적 간단하고 여기에 대한 작업 수가 많지 않은 애플리케이션의 경우
Mutex는 개념적으로 이해하고 구현하기 더 쉽습니다. - 오버헤드가 적을 때: 메시지 전달의 간접화 없이 직접적인
Mutex액세스는 때때로 개별 작업에 대한 지연 시간이 낮을 수 있지만, 이는 경쟁으로 상쇄될 수 있습니다. - 직접 액세스: 애플리케이션의 여러 다른 부분이 연결 정보의 하위 집합에 직접 쿼리하거나 수정해야 하는 경우.
비교 및 고려 사항
| 특징 | 액터 모델 (ConnectionManagerActor) | Mutex<HashMap> |
|---|---|---|
| 동시성 모델 | 메시지 전달, 액터당 단일 스레드 처리 | 공유 메모리, 명시적 잠금 |
| 데이터 안전성 | 메시지 전달 설계를 통한 본질적인 안전성; 액터가 자체 상태 소유. | Mutex를 통한 안전성 (독점 액세스 보장). |
| 확장성 | 액터 분할 또는 관리자 액터 간 로드 분산을 통한 높은 확장성. 단일 액터에서 메시지를 순차적으로 처리하는 것은 병목 현상이 될 수 있습니다. | 잠금 획득으로 인해 Mutex 경쟁이 발생하면 병목 현상이 될 수 있습니다. 적당한 로드에는 좋습니다. |
| 복잡성 | 메시지 및 채널 정의로 인한 초기 설정 보일러플레이트가 높습니다. 설정이 완료되면 비즈니스 로직을 이해하기 더 쉽습니다. | 초기 설정이 더 간단합니다. 복잡한 시나리오에서 교착 상태를 관리하거나 적절한 잠금 해제를 보장하기 위해 복잡해질 수 있습니다. |
| 성능 | 메시지 전달의 오버헤드. 실제 데이터에 대한 잠금을 피함으로써 좋은 처리량. | Mutex 잠금/잠금 해제의 오버헤드. 경쟁 상황에서 더 높은 지연 시간을 가질 수 있습니다. |
| 테스트 용이성 | 사전 정의된 메시지를 보내고 응답을 확인하여 액터를 독립적으로 테스트하기 쉽습니다. | |
| 동시 액세스를 시뮬레이션하고 경합 조건을 확인하는 복잡한 테스트. | ||
| 디버깅 | 메시지는 이벤트의 명확한 감사 추적을 제공합니다. 상태 변경을 추적하기 쉽습니다. | |
| 교착 상태 또는 미묘한 경합 조건 디버깅은 어려울 수 있습니다. | ||
| 장애 격리 | 액터 장애는 일반적으로 해당 액터로 격리됩니다. | |
| 공유 데이터에 액세스하는 버그는 전체 시스템을 불안정하게 만들 수 있습니다. |
둘 중 하나를 선택할 때
-
액터 모델을 선택할 때:
- 애플리케이션 로직이 복잡하고, 여러 개의 명확한 상태가 있거나 특정 라우팅 요구 사항이 있는 경우.
Mutex경쟁이 상당한 병목 현상이 될 수 있는 매우 높은 동시성(초당 수천 개의 메시지)을 예상하는 경우.- 상태 전환에 대한 명시적인 제어를 선호하고 구성 요소가 잘 정의된 메시지를 통해서만 통신하는 시스템을 선호하는 경우.
- 대규모 분산 시스템에서의 유지 관리성 및 테스트 용이성이 우선 순위인 경우.
-
Mutex<HashMap>를 선택할 때:- 공유 상태가 비교적 간단하고(예: 클라이언트 송신자만 저장) 공유 맵을 직접 수정하는 동시 작업 수가 적당하여 경쟁이 주요 문제가 되지 않을 것으로 예상되는 경우.
- 직접적인 (경쟁이 없을 때) 공유 상태에 대한 더 빠른 액세스가 필요한 경우.
- 초기 구현의 단순성이 우선 순위인 경우.
이 두 가지가 상호 배타적이지 않다는 점도 주목할 가치가 있습니다. 높은 수준의 연결 관리 및 전반적인 시스템 조정을 위해 액터 모델을 사용하는 반면, 개별 액터(또는 액터 모델 외부의 시스템 부분)는 내부의 제한된 범위의 공유 가변 상태에 대해 Mutex를 사용할 수 있습니다.
결론
Rust에서 수천 개의 WebSocket 연결을 관리하려면 강력한 동시성 전략이 필요합니다. 중앙 관리자 액터를 통한 액터 모델과 Arc<Mutex<HashMap>> 패턴 모두 유효한 접근 방식을 제공합니다. 액터 모델은 메시지 기반 통신을 시행하고 상태를 격리하여 본질적으로 안전하고 확장 가능한 설계를 제공하여 고도로 대화형이고 데이터가 풍부한 실시간 애플리케이션에 이상적입니다. 대조적으로, Mutex<HashMap>는 덜 복잡한 공유 상태에 대한 더 간단하고 직접적인 솔루션을 제공하며, 종종 중간 정도의 부하에서 충분하고 성능이 뛰어나며 더 적은 아키텍처 계층을 사용합니다. 궁극적으로 최상의 선택은 애플리케이션의 특정 요구 사항에 따라 복잡성, 예상되는 부하 및 엄격한 상태 일관성 보장에 대한 필요성을 균형 있게 고려합니다. 두 패턴 모두 현명하게 적용될 때 Rust 개발자가 고도로 성능이 뛰어나고 안정적인 실시간 서비스를 구축할 수 있도록 지원합니다.

