FastAPI、SQLAlchemy 2.0、asyncpg を使った高パフォーマンスな非同期APIの構築
James Reed
Infrastructure Engineer · Leapcell

はじめに
Web開発が急速に進化する中で、パフォーマンスが高いだけでなくスケーラブルなアプリケーションを構築することが最も重要です。特にデータベースとのやり取りにおいて、従来の同期I/O操作はしばしばボトルネックとなり、ユーザーエクスペリエンスの低下やサーバーのスループットの低下につながります。そこで非同期プログラミングが威力を発揮します。これは、アプリケーションがノンブロッキング操作を実行できるようにするパラダイムシフトを提供し、それによってより多くの同時リクエストを効率的に処理できます。FastAPIは、その本質的な非同期機能により、高パフォーマンスなAPIの構築に最適なフレームワークとして台頭しています。しかし、非同期フレームワークを使用しても、データベース層が正しく処理されない場合、依然として同期的なブロッキングの原因となる可能性があります。この記事では、FastAPI、SQLAlchemy 2.0 の非同期エンジン、および asyncpg
ドライバーの強力な相乗効果を探り、非同期データベースインタラクションの可能性を最大限に活用する、堅牢で高並行なAPIの構築方法を実証します。
非同期データベースアクセスのためのコアコンポーネント
実装の詳細に入る前に、FastAPIアプリケーション内で非同期データベースアクセスを実現するために含まれる主要なテクノロジーについて明確に理解しておきましょう。
- FastAPI: Python 3.7+ をベースにしたAPI構築のための、モダンで高速(高パフォーマンス)なWebフレームワーク。標準的なPythonの型ヒントに基づいています。ネイティブな非同期サポートにより、I/Oバウンドなアプリケーションに最適です。
- SQLAlchemy 2.0: Pythonのための強力で柔軟なORM(オブジェクトリレーショナルマッパー)です。バージョン2.0は、特に非同期データベース操作のファーストクラスサポートを導入しており、モダンな非同期フレームワークとの連携がより緊密になっています。
- 非同期エンジン (SQLA 2.0): これは、SQLAlchemy の新しい
create_async_engine
および関連する非同期コンストラクトを指します。データベース応答を待つ間にイベントループをブロックする代わりに、これらの非同期コンポーネントにより、アプリケーションは他のタスクに切り替えることができ、並行処理が向上します。 - asyncpg: Pythonのための高速なPostgreSQLデータベースドライバーです。これは
asyncio
を使用した非同期I/Oのためにゼロから設計されており、非同期コンテキストで使用する場合、従来のドライバーと比較して優れたパフォーマンスを提供します。SQLAlchemy の非同期PostgreSQLバックエンドに推奨されるドライバーです。 async/await
: コルーチンの定義と実行のためのPythonのキーワード。コルーチンは、実行を一時停止して後で再開できる関数であり、ノンブロッキングI/Oを可能にします。
非同期データベースインタラクションの原則
SQLAlchemy 2.0 と asyncpg
を使用した非同期データベースインタラクションの核心的な原則は、すべてのデータベース操作がノンブロッキングであることを保証することです。async
関数がデータベースクエリを発行するとき、データベースからの応答を待つ代わりに、イベントループに制御を譲ります。イベントループは、データベース応答が準備できるまで、他の保留中のタスク(例:別の受信リクエストの処理)を実行できます。応答が到着したら、元の async
関数が再開されます。
非同期エンジンのセットアップとセッション
まず、asyncpg
を使用してSQLAlchemyの非同期機能を設定する必要があります。
まず、必要なライブラリがインストールされていることを確認してください。
pip install fastapi "uvicorn[standard]" sqlalchemy "asyncpg<0.29.0"
注意: SQLAlchemy 2.0.x シリーズでは、asyncpg
バージョン 0.29.0 以降は create_async_engine
で問題が発生する可能性があります。現時点では <0.29.0
にピン留めする方が安全です。
次に、非同期エンジンとセッションファクトリを設定しましょう。
# database.py from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.orm import declarative_base # PostgreSQL接続文字列に置き換えてください # 'postgresql+asyncpg' プレフィックスはSQLAlchemyにasyncpgを使用するように指示します DATABASE_URL = "postgresql+asyncpg://user:password@host:port/dbname" # 非同期エンジンを作成 # pool_pre_ping=True は、長時間実行されるアプリケーションの接続を維持するのに役立ちます engine = create_async_engine(DATABASE_URL, echo=True, pool_pre_ping=True) # 非同期セッションメーカーを設定 # expire_on_commit=False は、コミット後にオブジェクトが期限切れ(デタッチ)になるのを防ぎます。 # これは、さらに操作を行うためにオブジェクトをセッションに接続したままにするのに役立ちます。 AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) Base = declarative_base() # 非同期データベースセッションを取得するための依存関係 async def get_db(): async with AsyncSessionLocal() as session: yield session
このセットアップでは:
create_async_engine
は、asyncpg
を使用してPostgreSQLデータベースに接続された非同期エンジンを初期化します。async_sessionmaker
は、AsyncSession
オブジェクトを作成するためのファクトリを提供します。get_db
はFastAPIの依存関係であり、AsyncSession
をyieldします。これにより、適切なセッションライフサイクル管理(作成とクローズ)が保証されます。
モデルの定義とマイグレーションの実行
同期SQLAlchemyと同様に、declarative_base
を使用してモデルを定義します。
# models.py from sqlalchemy import Column, Integer, String from database import Base class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) description = Column(String) def __repr__(self): return f"<Item(id={self.id}, name='{self.name}')>" # テーブル作成の例(通常はalembicでマイグレーションを行います) async def create_db_and_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # データベースを設定するために、通常はこの関数を一度実行します # import asyncio # async def main(): # await create_db_and_tables() # # if __name__ == "__main__": # asyncio.run(main())
FastAPIエンドポイントとの統合
次に、これらのコンポーネントをFastAPIアプリケーションに組み込んで、非同期CRUD操作を実行しましょう。
# main.py from fastapi import FastAPI, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from database import get_db, create_db_and_tables, engine, AsyncSessionLocal from models import Item from schemas import ItemCreate, ItemResponse # これらのPydanticモデルが定義されていると仮定 app = FastAPI() @app.on_event("startup") async def startup_event(): # 例: スタートアップ時にテーブルを作成(簡単なアプリでは、本番環境ではAlembicを使用してください) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("Database tables created (if not exist).") @app.post("/items/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED) async def create_item(item: ItemCreate, db: AsyncSession = Depends(get_db)): db_item = Item(**item.dict()) db.add(db_item) await db.commit() # 非同期コミット await db.refresh(db_item) # 生成されたID/デフォルト値などを取得するためにリフレッシュ return db_item @app.get("/items/", response_model=list[ItemResponse]) async def read_items(skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_db)): # 非同期クエリ実行 result = await db.execute(select(Item).offset(skip).limit(limit)) items = result.scalars().all() return items @app.get("/items/{item_id}", response_model=ItemResponse) async def read_item(item_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Item).filter(Item.id == item_id)) item = result.scalars().first() if item is None: raise HTTPException(status_code=404, detail="Item not found") return item # 例Pydanticモデル (schemas.py) # from pydantic import BaseModel # class ItemBase(BaseModel): # name: str # description: str | None = None # # class ItemCreate(ItemBase): # pass # # class ItemResponse(ItemBase): # id: int # # class Config: # orm_mode = True # SQLAlchemy ORM互換性のため
main.py
の例では:
startup_event
は、Base.metadata
の同期メソッドであるcreate_all
を使用しているため、conn.run_sync(Base.metadata.create_all)
を使用しています。conn.run_sync
は、同期呼び出しを非同期コンテキスト内で橋渡しし、スレッドプールで実行します。これは、同期操作を非同期フローに統合するための一般的なパターンです。- すべてのデータベース操作(
db.add
、db.commit
、db.execute
)はawait
でプレフィックスされ、非同期であることを示しています。 select(Item)
は、SQLAlchemy 2.0 でクエリを構築する最新の方法であり、より明示的な制御と優れた型推論を提供します。result.scalars().all()
は、クエリからすべてのスカラー結果を効率的に取得します。
実世界のシナリオでの応用
このパターンは以下に非常に適しています:
- マイクロサービス: データベースとやり取りする、独立した高パフォーマンスなサービスを構築する。
- リアルタイムAPI: チャットアプリケーション、ゲームバックエンド、または金融取引プラットフォームのように、低レイテンシと高並行処理が重要な場合。
- データ取り込みAPI: 大量のデータを効率的に受信および保存する。
- Webhook: ブロッキングせずに、外部サービスからの着信データを処理する。
SQLAlchemy の非同期エンジン内で asyncpg
を利用することで、FastAPIアプリケーションは従来の同期セットアップと比較して大幅に高い並行処理とスループットを達成できます。PostgreSQLインタラクションのために高度に最適化された asyncpg
のC実装と asyncio
のイベントループ管理を組み合わせることで、アイドル時間を最小限に抑え、APIがより多くの同時クライアント接続を効率的に処理できるようにします。
結論
FastAPI、SQLAlchemy 2.0 の非同期エンジン、および asyncpg
ドライバーの組み合わせは、Pythonで高パフォーマンスでスケーラブルなWeb APIを構築するための強力でモダンなスタックを表します。フレームワークレベルからデータベースドライバーまで、非同期プログラミングを採用することで、開発者は重負荷時にも迅速に応答し、システムリソースを効率的に利用するアプリケーションを作成できます。このアプローチはユーザーエクスペリエンスを向上させるだけでなく、将来のスケーラビリティのための堅牢な基盤を提供します。これらのツールを使用して非同期ネイティブアプリケーションを構築することは、モダンで効率的なWeb開発への重要な一歩です。