Python을 사용한 CQRS로 고도로 확장 가능한 비즈니스 시스템 구축
Ethan Miller
Product Engineer · Leapcell

소개
복잡한 비즈니스 애플리케이션의 세계에서 데이터를 효과적으로 관리하는 것이 가장 중요합니다. 시스템의 규모와 기능이 커짐에 따라 데이터 처리 메커니즘에 대한 요구 사항도 intensifies됩니다. 기존의 모놀리식 아키텍처는 종종 따라잡기 어려워 성능 병목 현상, 복잡성 증가 및 민첩성 유지의 어려움을 초래합니다. 분당 수천 건의 주문을 처리해야 하는 동시에 수백만 명의 사용자가 제품 카탈로그를 찾아볼 수 있도록 해야 하는 높은 트래픽을 처리하는 전자 상거래 플랫폼을 상상해 보세요. 단일 데이터베이스는 종종 병목 현상이 됩니다. 이곳에서 명령-쿼리 책임 분리(CQRS)와 같은 패턴이 강력한 대안을 제공합니다. CQRS는 데이터와 상호 작용하는 방식을 근본적으로 재고하여 복잡한 비즈니스 시스템의 성능, 확장성 및 유지 관리 용이성을 크게 향상시킬 수 있는 전문화된 접근 방식을 제공합니다. Python을 사용하여 CQRS를 활용하여 진정으로 복원력이 있고 고성능 애플리케이션을 구축하는 방법에 대해 알아보겠습니다.
CQRS의 기초 이해
실질적인 내용에 들어가기 전에 CQRS를 뒷받침하고 성공적인 구현에 중요한 핵심 개념을 명확히 해 보겠습니다.
명령-쿼리 책임 분리(CQRS): 핵심적으로 CQRS는 데이터 수정(명령) 처리 책임과 데이터 검색(쿼리) 처리 책임을 분리하는 패턴입니다. 두 작업 모두 수행하는 단일 모델이나 인터페이스 대신 별도의 모델이 있습니다.
- 명령: 시스템 상태 변경 의도를 나타내는 객체입니다. '무엇이 발생해야 하는가'에 초점을 맞춰 명령형입니다.
- 쿼리: 데이터 검색 의도를 나타내는 객체입니다. '어떤 데이터가 필요한가'에 초점을 맞춰 선언형입니다.
이벤트 소싱(CQRS와 함께 자주 사용됨): 엄격히 요구되지는 않지만 이벤트 소싱은 CQRS와 함께 자주 사용되는 패턴입니다. 집계의 현재 상태만 저장하는 대신 이벤트 소싱은 모든 상태 변경을 추가 전용 로그에 불변하는 이벤트로 영구 저장합니다. 그런 다음 현재 상태는 이러한 이벤트를 다시 재생하여 재구성됩니다. 이를 통해 완전한 감사 추적이 가능해지고 시간 여행 및 디버깅과 같은 강력한 기능을 사용할 수 있습니다.
도메인 주도 설계(DDD): CQRS 및 이벤트 소싱은 종종 DDD 원칙에 대한 강력한 이해로부터 이익을 얻습니다. DDD는 도메인 전문가의 입력을 기반으로 소프트웨어를 도메인에 맞춰 모델링하는 데 중점을 둡니다. 집계, 엔터티, 값 객체와 같은 개념은 명령 및 이벤트에 대한 명확한 경계를 정의하는 데 도움이 됩니다.
CQRS가 해결하는 문제
일반적인 생성, 읽기, 업데이트, 삭제(CRUD) 애플리케이션에서 읽기 및 쓰기 작업은 종종 동일한 데이터 모델과 인프라를 공유합니다. 이는 간단한 애플리케이션의 경우 잘 작동하지만 복잡한 시스템에서는 이 통합된 접근 방식이 여러 가지 문제를 야기할 수 있습니다.
- 성능 병목 현상: 읽기 모델은 종종 속도와 비정규화에 최적화되어 있고 쓰기 모델은 일관성과 정규화에 우선 순위를 두지만; 단일 모델을 사용하면 절충이 강제됩니다.
- 확장성 문제: 읽기 작업이 쓰기 작업을 훨씬 더 많이 능가합니다. 높은 읽기 및 쓰기 부하를 모두 효율적으로 처리하기 위해 결합된 읽기/쓰기 데이터베이스를 확장하는 것은 어렵고 비용이 많이 들 수 있습니다.
- 복잡성: 읽기(예: 복잡한 보고, 검색)와 쓰기(예: 트랜잭션 무결성)에 대한 다양한 비즈니스 요구 사항으로 인해 단일 모델이 지나치게 복잡해지고 관리가 어려워질 수 있습니다.
- 보안 문제: 읽기 작업에 적용된 세분화된 보안은 쓰기 작업과 크게 다를 수 있습니다.
CQRS 작동 방식 및 Python에서의 구현
CQRS의 핵심 원칙은 간단합니다. 명령과 쿼리에 대한 별도의 경로입니다.
-
**명령 경로(쓰기 측):
- 명령은 클라이언트(예: 사용자 인터페이스, 다른 서비스)에 의해 발급됩니다.
- 명령 핸들러가 명령을 수신합니다.
- 명령 핸들러는 관련 집계(쓰기 모델 데이터베이스, 이벤트 소싱을 사용하는 경우 이벤트 저장소)를 로드합니다.
- 집계는 비즈니스 논리를 수행하고 명령을 확인하며 도메인 이벤트를 발생시킵니다.
- 이러한 이벤트는 이벤트 저장소에 영구 저장됩니다.
- 선택적으로 이벤트는 이벤트 버스에 게시됩니다.
-
**쿼리 경로(읽기 측):
- 쿼리는 클라이언트에서 발급됩니다.
- 쿼리 핸들러가 쿼리를 수신합니다.
- 쿼리 핸들러는 읽기 모델 데이터베이스에서 직접 데이터를 검색합니다. 이 데이터베이스는 일반적으로 비정규화되어 있으며 빠른 읽기(예: 문서 데이터베이스, 검색 인덱스 또는 고도로 최적화된 관계형 데이터베이스 뷰)에 대해 특별히 최적화되어 있습니다.
- 데이터는 클라이언트에 반환됩니다.
읽기 모델이 최신 상태로 유지되는 방식에서 마법이 일어납니다. 이벤트가 이벤트 버스(명령 경로에서)에 게시되면 **읽기 모델 프로젝터(또는 이벤트 핸들러)**가 이러한 이벤트를 소비하고 비정규화된 읽기 모델 데이터베이스를 업데이트합니다. 이 비동기 업데이트는 쓰기 측이 읽기 측과 분리되어 독립적인 확장 및 최적화를 허용하도록 보장합니다.
Order
시스템에 대한 단순화된 Python 예를 통해 설명해 보겠습니다.
import abc import uuid from datetime import datetime from typing import Dict, List, Optional, Type, TypeVar # --- 1. 도메인 이벤트 (발생한 일에 대한 불변의 사실) --- class DomainEvent: def __init__(self, aggregate_id: str, timestamp: datetime = None): self.aggregate_id = aggregate_id self.timestamp = timestamp or datetime.utcnow() class OrderCreated(DomainEvent): def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float): super().__init__(order_id) self.customer_id = customer_id self.items = items self.total_amount = total_amount class OrderItemQuantityAdjusted(DomainEvent): def __init__(self, order_id: str, item_name: str, new_quantity: int): super().__init__(order_id) self.item_name = item_name self.new_quantity = new_quantity class OrderShipped(DomainEvent): def __init__(self, order_id: str): super().__init__(order_id) # --- 2. 집계 (비즈니스 로직 및 이벤트를 통한 상태 변경) --- class OrderAggregate: def __init__(self, order_id: str): self.id = order_id self.customer_id: Optional[str] = None self.items: Dict[str, int] = {} self.total_amount: float = 0.0 self.status: str = "PENDING" self._uncommitted_events: List[DomainEvent] = [] def apply(self, event: DomainEvent): """이벤트를 집계 상태에 적용합니다.""" if isinstance(event, OrderCreated): self.customer_id = event.customer_id self.items = event.items self.total_amount = event.total_amount self.status = "CREATED" elif isinstance(event, OrderItemQuantityAdjusted): if event.item_name in self.items: self.items[event.item_name] = event.new_quantity # 실제 시스템에서는 total_amount를 다시 계산합니다. else: raise ValueError(f"Item {event.item_name} not in order.") elif isinstance(event, OrderShipped): self.status = "SHIPPED" # 필요한 경우 더 많은 이벤트 핸들러를 추가합니다. @classmethod def create(cls, customer_id: str, items: Dict[str, int], total_amount: float): order_id = str(uuid.uuid4()) aggregate = cls(order_id) event = OrderCreated(order_id, customer_id, items, total_amount) aggregate.apply(event) aggregate._uncommitted_events.append(event) return aggregate def adjust_item_quantity(self, item_name: str, new_quantity: int): if self.status != "CREATED": raise ValueError("Cannot adjust items for an order that is not in 'CREATED' status.") event = OrderItemQuantityAdjusted(self.id, item_name, new_quantity) self.apply(event) self._uncommitted_events.append(event) def ship_order(self): if self.status != "CREATED": raise ValueError("Only 'CREATED' orders can be shipped.") event = OrderShipped(self.id) self.apply(event) self._uncommitted_events.append(event) def get_uncommitted_events(self) -> List[DomainEvent]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # --- 3. 명령 (상태 변경 요청) --- class Command: pass class CreateOrderCommand(Command): def __init__(self, customer_id: str, items: Dict[str, int], total_amount: float): self.customer_id = customer_id self.items = items self.total_amount = total_amount class AdjustOrderItemQuantityCommand(Command): def __init__(self, order_id: str, item_name: str, new_quantity: int): self.order_id = order_id self.item_name = item_name self.new_quantity = new_quantity class ShipOrderCommand(Command): def __init__(self, order_id: str): self.order_id = order_id # --- 4. 명령 핸들러 (명령을 처리하고 이벤트를 생성) --- class CommandHandler(abc.ABC): @abc.abstractmethod def handle(self, command: Command) -> None: pass # 시연을 위한 가상의 이벤트 저장소 및 이벤트 버스 class EventStore: def __init__(self): self.events: Dict[str, List[DomainEvent]] = {} def save_events(self, aggregate_id: str, new_events: List[DomainEvent]): if aggregate_id not in self.events: self.events[aggregate_id] = [] self.events[aggregate_id].extend(new_events) print(f"Saved {len(new_events)} events for aggregate {aggregate_id}. Total: {len(self.events[aggregate_id])}") def get_events_for_aggregate(self, aggregate_id: str) -> List[DomainEvent]: return self.events.get(aggregate_id, []) class EventBus: def __init__(self): self._handlers: Dict[Type[DomainEvent], List[callable]] = {} def subscribe(self, event_type: Type[DomainEvent], handler: callable): if event_type not in self._handlers: self._handlers[event_type] = [] self._handlers[event_type].append(handler) def publish(self, event: DomainEvent): for handler in self._handlers.get(type(event), []): handler(event) print(f"Published event: {type(event).__name__} for aggregate {event.aggregate_id}") class CreateOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: CreateOrderCommand): order = OrderAggregate.create(command.customer_id, command.items, command.total_amount) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() return order.id class AdjustOrderItemQuantityCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: AdjustOrderItemQuantityCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # 상태 재구성 order.adjust_item_quantity(command.item_name, command.new_quantity) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() class ShipOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: ShipOrderCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # 상태 재구성 order.ship_order() self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() # --- 5. 읽기 모델 (비정규화, 쿼리에 최적화) --- class OrderReadModel: def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float, status: str, created_at: datetime): self.id = order_id self.customer_id = customer_id self.items = items self.total_amount = total_amount self.status = status self.created_at = created_at # 가상의 읽기 모델 데이터베이스 class OrderReadModelDatabase: def __init__(self): self.orders: Dict[str, OrderReadModel] = {} def save(self, read_model: OrderReadModel): self.orders[read_model.id] = read_model print(f"Read model updated for order {read_model.id} (Status: {read_model.status})") def get_by_id(self, order_id: str) -> Optional[OrderReadModel]: return self.orders.get(order_id) def get_all_pending_orders(self) -> List[OrderReadModel]: return [order for order in self.orders.values() if order.status == "CREATED"] # --- 6. 읽기 모델 프로젝터 (이벤트에서 읽기 모델 업데이트) --- class OrderReadModelProjector: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle_order_created(self, event: OrderCreated): read_model = OrderReadModel( order_id=event.aggregate_id, customer_id=event.customer_id, items=event.items, total_amount=event.total_amount, status="CREATED", # 초기 상태 created_at=event.timestamp ) self.read_model_db.save(read_model) def handle_order_item_quantity_adjusted(self, event: OrderItemQuantityAdjusted): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.items[event.item_name] = event.new_quantity # 단순화를 위해 실제 시스템에서는 total_amount를 다시 계산합니다. self.read_model_db.save(existing_read_model) def handle_order_shipped(self, event: OrderShipped): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.status = "SHIPPED" self.read_model_db.save(existing_read_model) # --- 7. 쿼리 (읽기 모델에서 데이터 검색) --- class GetOrderByIdQuery: def __init__(self, order_id: str): self.order_id = order_id class GetPendingOrdersQuery: pass class GetOrderByIdQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetOrderByIdQuery) -> Optional[OrderReadModel]: return self.read_model_db.get_by_id(query.order_id) class GetPendingOrdersQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetPendingOrdersQuery) -> List[OrderReadModel]: return self.read_model_db.get_all_pending_orders() # --- 오케스트레이션 --- class Application: def __init__(self): self.event_store = EventStore() self.event_bus = EventBus() self.read_model_db = OrderReadModelDatabase() # 명령 핸들러 등록 self.create_order_cmd_handler = CreateOrderCommandHandler(self.event_store, self.event_bus) self.adjust_item_cmd_handler = AdjustOrderItemQuantityCommandHandler(self.event_store, self.event_bus) self.ship_order_cmd_handler = ShipOrderCommandHandler(self.event_store, self.event_bus) # 쿼리 핸들러 등록 self.get_order_by_id_query_handler = GetOrderByIdQueryHandler(self.read_model_db) self.get_pending_orders_query_handler = GetPendingOrdersQueryHandler(self.read_model_db) # 이벤트 핸들러 (프로젝터) 등록하여 읽기 모델 재구축 self.order_projector = OrderReadModelProjector(self.read_model_db) self.event_bus.subscribe(OrderCreated, self.order_projector.handle_order_created) self.event_bus.subscribe(OrderItemQuantityAdjusted, self.order_projector.handle_order_item_quantity_adjusted) self.event_bus.subscribe(OrderShipped, self.order_projector.handle_order_shipped) def execute_command(self, command: Command): if isinstance(command, CreateOrderCommand): return self.create_order_cmd_handler.handle(command) elif isinstance(command, AdjustOrderItemQuantityCommand): self.adjust_item_cmd_handler.handle(command) elif isinstance(command, ShipOrderCommand): self.ship_order_cmd_handler.handle(command) else: raise ValueError(f"Unknown command: {type(command)}") def execute_query(self, query): if isinstance(query, GetOrderByIdQuery): return self.get_order_by_id_query_handler.handle(query) elif isinstance(query, GetPendingOrdersQuery): return self.get_pending_orders_query_handler.handle(query) else: raise ValueError(f"Unknown query: {type(query)}") # --- 클라이언트 사용 --- if __name__ == "__main__": app = Application() # --- 명령 측 --- print("--- Executing Commands ---") create_order_cmd = CreateOrderCommand( customer_id="cust-123", items={"Laptop": 1, "Mouse": 1}, total_amount=1200.00 ) order_id = app.execute_command(create_order_cmd) print(f"New Order Created with ID: {order_id}") adjust_item_cmd = AdjustOrderItemQuantityCommand( order_id=order_id, item_name="Mouse", new_quantity=2 ) app.execute_command(adjust_item_cmd) ship_order_cmd = ShipOrderCommand(order_id=order_id) app.execute_command(ship_order_cmd) # --- 쿼리 측 --- print("\n--- Executing Queries ---") # 특정 주문에 대한 쿼리 query_order_by_id = GetOrderByIdQuery(order_id=order_id) order_details = app.execute_query(query_order_by_id) if order_details: print(f"Query Result for Order {order_details.id}:") print(f" Customer: {order_details.customer_id}") print(f" Items: {order_details.items}") print(f" Total: {order_details.total_amount}") print(f" Status: {order_details.status}") else: print(f"Order {order_id} not found in read model.") # 보류 중인 주문을 시연하기 위해 다른 주문 생성 new_order_id = app.execute_command(CreateOrderCommand("cust-456", {"Book": 3}, 75.00)) print(f"Another Order Created with ID: {new_order_id}") # 보류 중인 주문에 대한 쿼리 pending_orders = app.execute_query(GetPendingOrdersQuery()) print("\nPending Orders:") for order in pending_orders: print(f" Order ID: {order.id}, Customer: {order.customer_id}, Status: {order.status}")
코드 예제 설명:
DomainEvent
: 시스템에서 발생한 일에 대한 불변의 사실(예:OrderCreated
,OrderItemQuantityAdjusted
)을 나타냅니다.OrderAggregate
: 쓰기 모델의 핵심입니다. 비즈니스 규칙을 적용하고DomainEvent
를 생성할 책임이 있습니다. 상태를 직접 수정하는 것이 아니라 이벤트를apply
하여 상태를 재구성합니다._uncommitted_events
목록을 유지합니다.Command
: 의도를 표현하는 간단한 데이터 구조(예:CreateOrderCommand
,ShipOrderCommand
).CommandHandler
: 명령을 가져와 집계를 로드합니다(이전 이벤트에서EventStore
에서 상태 재구성). 집계에서 필요한 작업을 수행한 다음 새로 생성된 이벤트를EventStore
에 영구 저장하고EventBus
에 게시합니다.EventStore
: 이벤트에 대한 간단한 인메모리 저장소입니다. 실제 애플리케이션에서는 영구 데이터베이스(PostgreSQL, DynamoDB 또는 전용 이벤트 저장소)입니다.EventBus
: 등록된 핸들러에 이벤트를 게시하는 간단한 디스패처입니다. 프로덕션 시스템에서는 Kafka, RabbitMQ 또는 AWS SNS/SQS일 수 있습니다.OrderReadModel
: 표시 및 쿼리에 최적화된 주문 데이터의 비정규화된 평면 프로젝션입니다.OrderReadModelDatabase
: 읽기 모델에 대한 간단한 인메모리 데이터베이스입니다. 일반적으로 NoSQL 데이터베이스(MongoDB, Elasticsearch), 고도로 비정규화된 관계형 테이블 또는 검색 인덱스입니다.OrderReadModelProjector
:EventBus
에서DomainEvent
를 소비하고OrderReadModelDatabase
를 업데이트합니다. 이곳은 비정규화 및 읽기 모델 최적화가 발생하는 곳입니다.Query
: 정보에 대한 요청을 나타내는 간단한 데이터 구조(예:GetOrderByIdQuery
).QueryHandler
: 복잡한 비즈니스 논리 없이OrderReadModelDatabase
에서 직접 데이터를 검색합니다.
이 설정은 책임을 명확하게 분리합니다. OrderAggregate
는 쓰기 중에 비즈니스 규칙 및 일관성을 보장하는 데 전념하며, ReadModelProjector
는 쿼리에 대한 최적화된 뷰를 독립적으로 구축합니다.
애플리케이션 시나리오
CQRS는 모든 문제를 해결하기 위한 만능 해결책은 아니지만 특정 복잡한 시나리오에서 빛을 발합니다.
- 고성능 읽기/쓰기 시스템: 읽기 및 쓰기 부하가 현저히 다른 시스템으로, 각 시스템을 독립적으로 확장하는 것이 중요합니다(예: 소셜 미디어 피드, 전자 상거래 플랫폼, IoT 데이터 처리).
- 복잡한 도메인 로직: 쓰기 모델에 정교한 비즈니스 규칙 적용 및 무결성 확인이 필요한 경우(종종 이벤트 소싱과 함께 사용됨).
- 데이터 보고 및 분석: 트랜잭션 성능에 영향을 주지 않고 복잡한 보고 쿼리에 최적화된 특수 읽기 모델 구축.
- 외부 시스템과의 통합: 다른 소비자에게 맞춤화된 읽기 모델을 제공하거나 타사 분석 도구와 통합.
- 이벤트 중심 아키텍처: 서비스가 이벤트를 통해 통신하는 마이크로 서비스 및 이벤트 중심 패턴과 자연스럽게 일치합니다.
- 감사 및 디버깅: CQRS와 함께 사용되는 이벤트 소싱은 모든 변경 사항에 대한 완전한 시간 기록을 제공하므로 감사, 디버깅 및 과거 상태 재현에 매우 중요합니다.
결론
Python에서 명령-쿼리 책임 분리(CQRS)를 구현하면 복잡하고 확장 가능하며 유지 관리 가능한 비즈니스 시스템을 구축하기 위한 강력한 아키텍처 패턴을 제공합니다. 데이터를 수정하는 모델과 쿼리하는 모델을 명시적으로 분리함으로써 독립적인 최적화, 향상된 성능 및 증가된 복원력의 기회를 열 수 있습니다. 복잡성이 증가하지만 특정 고부하 시나리오(향상된 확장성부터 명확한 도메인 모델링까지)에서의 이점은 CQRS를 최신 개발자 도구의 귀중한 도구로 만듭니다. CQRS를 활용하여 성능이 뛰어날 뿐만 아니라 끊임없이 변화하는 요구 사항에 따라 우아하게 발전할 수 있는 비즈니스 애플리케이션을 구축하십시오.