Actix WebとAxumアプリケーションにおける堅牢な状態管理
Min-jun Kim
Dev Intern · Leapcell

Rustで堅牢かつスケーラブルなWebアプリケーションを構築する際には、共有リソースを効率的に管理することがよくあります。データベース接続プール、アプリケーション全体の設定、キャッシュのいずれであっても、これらのコンポーネントは、複数の同時リクエストでアクセス可能で安全に管理される必要があります。Actix WebやAxumのような非同期Rust Webフレームワークの世界では、これは特有の課題を提示します。競合状態やパフォーマンスのボトルネックを導入することなく、これらの重要なデータの一部をどのように共有できるでしょうか?この記事では、Actix WebおよびAxumアプリケーションにおける共有状態を管理するためのさまざまな戦略について掘り下げ、それらの根本的な原則、実際の実装、および適切なユースケースを強調します。これらのテクニックを習得することで、開発者はより保守性が高く、パフォーマンスが高く、信頼性の高いRust Webサービスを構築できます。
共有状態のコアコンセプト
戦略に入る前に、Rustの非同期コンテキストにおける共有状態管理を理解するために不可欠ないくつかのコアコンセプトを定義しましょう。
- 共有状態: アプリケーションの複数の部分から、しばしば同時にアクセスおよび変更される必要があるデータ。例としては、データベース接続プール、アプリケーション構成、キャッシングレイヤー、またはメトリクスカウンターがあります。
- 並行処理: システムが複数のタスクまたはリクエストを、同時に処理できる能力。Webアプリケーションでは、これは多くのユーザーリクエストを同時に処理することを意味します。
- スレッドセーフティ: 共有データに複数のスレッドから同時にアクセスおよび変更しても、データ破損や予期しない動作につながらないという保証。Rustの型システム、特に
Send
およびSync
トレイトがここで重要な役割を果たします。 - 非同期コンテキスト: I/O(ネットワークリクエストやデータベースクエリなど)が完了するのを待っている間、現在のスレッドをブロックしない操作。すべての最新のRust Webフレームワークは、非同期ランタイム上に構築されています。
Arc
(Atomic Reference Counted): 値の複数の所有者がスレッド間で値を共有できるようにするスマートポインタ。最後のArc
がスコープを外れると、含まれる値はドロップされます。共有所有権を提供します。Mutex
(Mutual Exclusion Lock): 1つのスレッドのみが一度に共有リソースにアクセスできることを保証する同期プリミティブ。アクセスをロックすることで競合状態を防ぎ、データの整合性を確保します。RwLock
(Read-Write Lock): 複数のリーダーがリソースに同時にアクセスできることを許可する同期プリミティブですが、一度に1人のライターのみです。これは、読み取りが書き込みよりもはるかに頻繁な場合にMutex
よりも高い並行性を提供できます。OnceCell
/Lazy
: 値を正確に1回初期化し、しばしば遅延して初期化し、それから不変のアクセスを提供するユーティリティ。起動時に一度設定されるグローバル構成に便利です。
共有状態管理の戦略
Actix WebとAxumはどちらも、主にRustの並行処理プリミティブを活用した、共有状態を管理するための慣用的な方法を提供しています。
1. Arc<Mutex<T>>
/ Arc<RwLock<T>>
パターン
これは、Rustで可変の共有状態を管理するための最も基本的で広く使用されているパターンです。
原則:
共有データ T
を Mutex<T>
(またはRwLock<T>
)でラップして、変更のための排他的アクセスを保証し、次にそれを Arc<Mutex<T>>
(またはArc<RwLock<T>>
)でラップして、複数の所有権とスレッド間での安全な共有を許可します。データにアクセスする必要がある場合は、Arc
をクローンしてからMutex
(またはRwLock
)をロックして可変参照を取得します。
Arc<Mutex<T>>
: 書き込みが頻繁な場合、または常に排他的アクセスが必要な場合に使用します。Arc<RwLock<T>>
: 読み取りが書き込みよりも著しく頻繁な場合に使用します。これにより、複数のリーダーが同時にアクセスできます。
実装(Axum例):
use std:: sync::{Arc, Mutex}, collections::HashMap, }; use axum:: extract::{State, Path}, routing::{post, get}, Json, Router, }; use serde::{Serialize, Deserialize}; #[derive(Debug, Clone, Serialize, Deserialize)] struct User { id: u32, name: String, } // 共有アプリケーション状態 struct AppState { user_db: Arc<Mutex<HashMap<u32, User>>>, config_value: String, } #[tokio::main] async fn main() { let shared_state = Arc::new(AppState { user_db: Arc::new(Mutex::new(HashMap::new())), config_value: "Application Config".to_string(), }); let app = Router::new() .route("/users", post(create_user)) .route("/users/:id", get(get_user)) .with_state(shared_state); // 状態をルーターに注入 let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .unwrap(); println!("Listening on http://127.0.0.1:3000"); axum::serve(listener, app).await.unwrap(); } async fn create_user( State(state): State<Arc<AppState>>, // 共有状態を抽出 Json(payload): Json<User>, ) -> Json<User> { let mut db = state.user_db.lock().unwrap(); // ロックを取得 db.insert(payload.id, payload.clone()); Json(payload) } async fn get_user( State(state): State<Arc<AppState>>, // 共有状態を抽出 Path(id): Path<u32>, ) -> Option<Json<User>> { let db = state.user_db.lock().unwrap(); // ロックを取得 db.get(&id).cloned().map(Json) }
実装(Actix Web例):
use actix_web:: web, App, HttpServer, Responder, HttpResponse, }; use std:: sync::{Arc, Mutex}, collections::HashMap, }; use serde::{Serialize, Deserialize}; #[derive(Debug, Clone, Serialize, Deserialize)] struct User { id: u32, name: String, } // 共有アプリケーション状態 struct AppState { user_db: Arc<Mutex<HashMap<u32, User>>>, config_value: String, } async fn create_user_actix(state: web::Data<AppState>, user: web::Json<User>) -> impl Responder { let mut db = state.user_db.lock().unwrap(); // ロックを取得 db.insert(user.id, user.clone()); HttpResponse::Ok().json(user.0) } async fn get_user_actix(state: web::Data<AppState>, path: web::Path<u32>) -> impl Responder { let id = path.into_inner(); let db = state.user_db.lock().unwrap(); // ロックを取得 match db.get(&id) { Some(user) => HttpResponse::Ok().json(user), None => HttpResponse::NotFound().finish(), } } #[actix_web::main] async fn main() -> std::io::Result<()> { let shared_state = web::Data::new(AppState { // Actix用に状態をweb::Dataでラップ user_db: Arc::new(Mutex::new(HashMap::new())), config_value: "Application Config".to_string(), }); HttpServer::new(move || { App::new() .app_data(shared_state.clone()) // アプリケーションとデータを共有 .service(web::resource("/users").route(web::post().to(create_user_actix))) .service(web::resource("/users/{id}").route(web::get().to(get_user_actix))) }) .bind(("127.0.0.1", 8080))? .run() .await }
アプリケーション:
このパターンは、データベース接続プール(例: sqlx::PgPool
)、アプリケーション全体キャッシュ、グローバルカウンター、または複数のリクエストハンドラによって共有され、変更される可能性のあるその他のデータ管理に最適です。
2. Arc<T>
による不変状態
共有状態が、初期化後に事実上不変である場合(例: 起動時にロードされる設定)、Mutex
またはRwLock
は不要です。
原則:
不変データ T
を直接 Arc<T>
でラップします。データは変更できないため、競合状態を心配する必要はなく、複数のスレッドはロックオーバーヘッドなしで自由に同時に読み取ることができます。
実装(Axum例):
use std::sync::Arc; use axum:: extract::State, routing::get, Router, }; use serde::Serialize; #[derive(Debug, Clone, Serialize)] struct AppConfig { api_key: String, database_url: String, max_connections: u32, } // 共有不変アプリケーション状態 struct AppState { config: Arc<AppConfig>, } #[tokio::main] async fn main() { let config = Arc::new(AppConfig { api_key: "my_secret_key".to_string(), database_url: "postgres://user:pass@host:port/db".to_string(), max_connections: 10, }); let shared_state = Arc::new(AppState { config }); let app = Router::new() .route("/config", get(get_app_config)) .with_state(shared_state); let listener = tokio::net::TcpListener::bind("127.0.0.1:3001") .await .unwrap(); println!("Listening on http://127.0.0.1:3001"); axum::serve(listener, app).await.unwrap(); } async fn get_app_config(State(state): State<Arc<AppState>>) -> axum::Json<AppConfig> { // configは不変なのでロックは不要 axum::Json(state.config.as_ref().clone()) }
アプリケーション: アプリケーション設定、起動時に一度ロードされる読み取り専用データ(例: 静的なマッピング、小さなルックアップテーブル)、またはアプリケーションの実行中に変更されないことが保証されているデータに最適です。
3. tokio::sync
プリミティブの使用
よりきめ細かな、または非同期に特化した並行処理のニーズのために、tokio::sync
はロックとチャネルの非同期バージョンを提供します。
tokio::sync::Mutex
: 待機可能な非同期Mutex
で、ロックを待っている間タスクを継続させることができ、エクゼキュータをブロックしません。tokio::sync::RwLock
: 同様の動作を持つ非同期RwLock
。tokio::sync::Semaphore
: 同時操作の数を制限するため。
原則:
これらの非同期プリミティブは、標準ライブラリの対応物と同様に動作しますが、async/.await
コンテキストにシームレスに適合します。これらは、ロックが多忙な場合にランタイムが他のタスクをスケジュールすることを可能にし、I/Oバウンド操作の全体的なスループットを向上させます。
実装(tokio::sync::Mutex
を使用したActix Web):
use actix_web:: web, App, HttpServer, Responder, HttpResponse, }; use std::collections::HashMap; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; use std::sync::Arc; // 共有所有権のためにまだArcが必要 #[derive(Debug, Clone, Serialize, Deserialize)] struct Item { id: u32, name: String, } // 非同期Mutexを持つ共有状態 struct AppState { item_cache: Arc<Mutex<HashMap<u32, Item>>>, } async fn add_item_async(state: web::Data<AppState>, item: web::Json<Item>) -> impl Responder { let mut cache = state.item_cache.lock().await; // ロックを待機 cache.insert(item.id, item.clone()); HttpResponse::Ok().json(item.0) } #[actix_web::main] async fn main() -> std::io::Result<()> { let shared_state = web::Data::new(AppState { item_cache: Arc::new(Mutex::new(HashMap::new())), }); HttpServer::new(move || { App::new() .app_data(shared_state.clone()) .service(web::resource("/items").route(web::post().to(add_item_async))) }) .bind(("127.0.0.1", 8081))? .run() .await }
アプリケーション:
共有リソースアクセスがasync
操作に関与している場合、または多数の同時非同期タスク間での競合ポイントになる可能性がある場合に適しています。sqlx
のようなライブラリのデータベース接続プールは通常、接続を待機する将来を返し、tokio::sync
パターンを非同期に互換性のあるものにします。
4. フレームワーク固有の状態管理(Actix Web web::Data
/ Axum State
)
Actix WebとAxumはどちらも、ハンドラに共有状態を注入するための独自の抽象化を提供しており、内部的には効率のためにArc
または参照カウントをよく活用しています。
原則:
フレームワークは、基盤となるArc
のクローンと共有ロジックを処理し、ハンドラ内で状態にアクセスすることを人間工学的にします。
- Actix Web
web::Data<T>
: アプリケーション状態をArc
でラップします。app_data()
でweb::Data
を登録すると、Actix WebはArc
を各ワーカースレッドにクローンし、各リクエストハンドラは参照カウントポインタを受け取ります。 - Axum
State<T>
: アプリケーション状態のAxumエクストラクタ。Clone
およびSend + Sync + 'static
を実装する必要があります。これは、with_state()
を使用する際に内部的にArc
でラップするためです。
実装: 上記のActix WebとAxumの両方の例は、すでにこれを示しています。
- Actix Web:
web::Data::new(AppState { ... })
およびapp_data(shared_state.clone())
の提供。ハンドラはstate: web::Data<AppState>
を受け取ります。 - Axum:
with_state(shared_state)
、ここでshared_state
はArc<AppState>
です。ハンドラはState(state): State<Arc<AppState>>
を受け取ります。
アプリケーション: これらは、各フレームワークでアプリケーションレベルの状態をハンドラに渡すための主要かつ推奨される方法です。フレームワークのアーキテクチャとシームレスに統合されています。
最適な戦略の選択
- 不変設定:
Arc<YourConfigStruct>
を使用します。読み取り専用データにとって最もシンプルで最もパフォーマンスが高いです。 - 変更可能、汎用:
std::sync
からのArc<Mutex<T>>
またはArc<RwLock<T>>
。これらは、多様な変更可能な共有リソースに対して堅牢です。読み取りが書き込みよりも大幅に頻繁な場合はRwLock
を使用し、そうでない場合はMutex
がしばしばシンプルで十分にパフォーマンスがあります。 - 変更可能、非同期対応:
Arc<tokio::sync::Mutex<T>>
またはArc<tokio::sync::RwLock<T>>
。アプリケーションがasync/.await
に大きく依存しており、ロックの競合がブロッキングを引き起こす可能性がある場合は、これらを優先します。 - データベース接続プール:
sqlx::PgPool
のようなライブラリは、内部的に接続を管理し、Send + Sync + 'static
である独自のPool
タイプを提供します。通常、これらをArc
(例:Arc<sqlx::PgPool>
)でラップしてから、web::Data
またはState
経由で渡します。
// sqlx PgPoolの例 use sqlx::PgPool; use std::sync::Arc; struct AppState { db_pool: Arc<PgPool>, // その他の共有状態 } // ... その後、web::Data<AppState>またはState<Arc<AppState>>を使用
結論
共有状態の管理は、効率的で正確なWebアプリケーションを構築するための基盤です。Rustは、その強力な型システムと並行処理プリミティブにより、これを安全に達成するための強力なツールを提供します。Arc
を共有所有権のために、Mutex
またはRwLock
を制御された変更性(標準およびtokio::sync
バージョン両方)のために、そしてActix Webのweb::Data
やAxumのState
のようなフレームワーク固有の抽象化を活用することで、開発者は高並列で堅牢なWebサービスを構築できます。鍵は、共有データの性質(不変か、頻繁に読み取られるか、頻繁に書き込まれるか)を理解し、パフォーマンスを犠牲にせずにスレッドセーフティを確保するために適切な同期プリミティブを選択することです。