Aufbau hoch skalierbarer Geschäftssysteme mit CQRS in Python
Ethan Miller
Product Engineer · Leapcell

Einführung
In der Welt komplexer Geschäftsanwendungen ist eine effektive Datenverwaltung von größter Bedeutung. Wenn Systeme in Bezug auf Umfang und Funktionalität wachsen, nehmen die Anforderungen an unsere Datenhandhabungsmechanismen zu. Traditionelle monolithische Architekturen haben oft Mühe, Schritt zu halten, was zu Leistungseinbußen, erhöhter Komplexität und Herausforderungen bei der Aufrechterhaltung der Agilität führt. Stellen Sie sich ein Szenario vor, in dem eine stark frequentierte E-Commerce-Plattform Tausende von Bestellungen pro Minute verarbeiten muss, während gleichzeitig Millionen von Benutzern Produktkataloge durchsuchen können. Eine einzige Datenbank wird oft zu einem Engpass. Hier bieten Muster wie Command Query Responsibility Segregation (CQRS) eine leistungsstarke Alternative. CQRS überdenkt grundsätzlich, wie wir mit unseren Daten interagieren, und bietet einen spezialisierten Ansatz, der die Leistung, Skalierbarkeit und Wartbarkeit komplexer Geschäftssysteme erheblich verbessern kann. Tauchen wir ein, wie wir CQRS mit Python nutzen können, um wirklich widerstandsfähige und hochleistungsfähige Anwendungen zu erstellen.
Grundlagen von CQRS verstehen
Bevor wir uns mit den praktischen Aspekten befassen, lassen Sie uns die Kernkonzepte klären, die CQRS zugrunde liegen und für eine erfolgreiche Implementierung von entscheidender Bedeutung sind.
Command Query Responsibility Segregation (CQRS): Im Kern ist CQRS ein Muster, das die Verantwortung für die Verarbeitung von Datenänderungen (Befehle) von der Verantwortung für die Datenabfrage (Anfragen) trennt. Anstelle eines einzelnen Modells oder einer einzelnen Schnittstelle, die beide Aktionen ausführt, haben Sie unterschiedliche Modelle:
- Befehle (Commands): Objekte, die die Absicht darstellen, den Zustand des Systems zu ändern. Sie sind imperativ und konzentrieren sich darauf, "was geschehen muss".
- Anfragen (Queries): Objekte, die die Absicht darstellen, Daten abzurufen. Sie sind deklarativ und konzentrieren sich darauf, "welche Daten benötigt werden".
Event Sourcing (oft mit CQRS kombiniert): Obwohl nicht zwingend erforderlich, wird Event Sourcing oft in Verbindung mit CQRS verwendet. Anstatt nur den aktuellen Zustand eines Aggregats zu speichern, speichert Event Sourcing jede Zustandsänderung als unveränderliches Ereignis in einem nur anhängbaren Protokoll. Der aktuelle Zustand wird dann durch das Wiedergeben dieser Ereignisse rekonstruiert. Dies bietet eine vollständige Audit-Trail und ermöglicht leistungsstarke Funktionen wie Zeitreisen und Debugging.
Domain-Driven Design (DDD): CQRS und Event Sourcing profitieren oft von einem soliden Verständnis der DDD-Prinzipien. DDD betont die Modellierung von Software, um eine Domäne gemäß der Eingabe von Domänenexperten abzugleichen. Konzepte wie Aggregate, Entitäten und Wertobjekte helfen bei der Definition klarer Grenzen für Befehle und Ereignisse.
Das Problem, das CQRS löst
In einer typischen Create, Read, Update, Delete (CRUD)-Anwendung teilen sich Lese- und Schreiboperationen oft dasselbe Datenmodell und dieselbe Infrastruktur. Dies funktioniert gut für einfachere Anwendungen, aber in komplexen Systemen kann dieser einheitliche Ansatz zu mehreren Herausforderungen führen:
- Leistungseinbußen: Read-Modelle sind oft für Geschwindigkeit und Denormalisierung optimiert, während Write-Modelle Konsistenz und Normalisierung priorisieren. Die Verwendung eines einzelnen Modells erzwingt Kompromisse.
- Skalierbarkeitsprobleme: Lesevorgänge übertreffen Schreibvorgänge oft erheblich. Die Skalierung einer kombinierten Lese-/Schreibdatenbank, um sowohl hohe Lese- als auch hohe Schreiblasten effizient zu bewältigen, kann schwierig und teuer sein.
- Komplexität: Unterschiedliche Geschäftsanforderungen für Leseoperationen (z. B. komplexe Berichterstattung, Suche) und Schreiboperationen (z. B. transaktionale Integrität) können ein einzelnes Modell übermäßig komplex und schwer zu verwalten machen.
- Sicherheitsbedenken: Granulare Sicherheit, die auf Leseoperationen angewendet wird, kann sich erheblich von Schreiboperationen unterscheiden.
Wie CQRS funktioniert und seine Implementierung in Python
Das Kernprinzip von CQRS ist unkompliziert: getrennte Pfade für Befehle und Anfragen.
-
**Befehlspfad (Schreibseite):
- Ein Befehl wird von einem Client (z. B. Benutzeroberfläche, anderer Dienst) ausgegeben.
- Ein Befehlshandler empfängt den Befehl.
- Der Befehlshandler lädt das relevante Aggregat (aus der Schreibmodell-Datenbank, oft einem Event Store, wenn Event Sourcing verwendet wird).
- Das Aggregat führt seine Geschäftslogik aus, validiert den Befehl und emittiert Domänenereignisse.
- Diese Ereignisse werden in einem Event Store gespeichert.
- Optional werden die Ereignisse über einen Event Bus veröffentlicht.
-
**Abfragepfad (Leseseite):
- Eine Anfrage wird von einem Client ausgegeben.
- Ein Anfragehandler empfängt die Anfrage.
- Der Anfragehandler ruft Daten direkt aus einer Read Model-Datenbank ab. Diese Datenbank ist typischerweise denormalisiert und speziell für schnelle Lesevorgänge optimiert (z. B. eine Dokumentendatenbank, ein Suchindex oder sogar eine hoch optimierte relationale Datenbankansicht).
- Die Daten werden an den Client zurückgegeben.
Die Magie geschieht darin, wie das Read-Modell auf dem neuesten Stand gehalten wird. Wenn Ereignisse vom Befehlspfad auf dem Event Bus veröffentlicht werden, verbrauchen Read Model Projectors (oder Event Handler) diese Ereignisse und aktualisieren die denormalisierte Read Model-Datenbank. Diese asynchrone Aktualisierung stellt sicher, dass die Schreibseite von der Leseseite entkoppelt bleibt, was eine unabhängige Skalierung und Optimierung ermöglicht.
Lassen Sie uns dies anhand eines vereinfachten Python-Beispiels für ein Order
-System veranschaulichen.
import abc import uuid from datetime import datetime from typing import Dict, List, Optional, Type, TypeVar # --- 1. Domänenereignisse (Unveränderliche Fakten über etwas, das passiert ist) --- class DomainEvent: def __init__(self, aggregate_id: str, timestamp: datetime = None): self.aggregate_id = aggregate_id self.timestamp = timestamp or datetime.utcnow() class OrderCreated(DomainEvent): def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float): super().__init__(order_id) self.customer_id = customer_id self.items = items self.total_amount = total_amount class OrderItemQuantityAdjusted(DomainEvent): def __init__(self, order_id: str, item_name: str, new_quantity: int): super().__init__(order_id) self.item_name = item_name self.new_quantity = new_quantity class OrderShipped(DomainEvent): def __init__(self, order_id: str): super().__init__(order_id) # --- 2. Aggregat (Geschäftslogik und Zustandsänderungen durch Ereignisse) --- class OrderAggregate: def __init__(self, order_id: str): self.id = order_id self.customer_id: Optional[str] = None self.items: Dict[str, int] = {} self.total_amount: float = 0.0 self.status: str = "PENDING" self._uncommitted_events: List[DomainEvent] = [] def apply(self, event: DomainEvent): """Wendet ein Ereignis auf den Zustand des Aggregats an.""" if isinstance(event, OrderCreated): self.customer_id = event.customer_id self.items = event.items self.total_amount = event.total_amount self.status = "CREATED" elif isinstance(event, OrderItemQuantityAdjusted): if event.item_name in self.items: self.items[event.item_name] = event.new_quantity # Recalculate total_amount in a real system else: raise ValueError(f"Item {event.item_name} not in order.") elif isinstance(event, OrderShipped): self.status = "SHIPPED" # Add more event handlers as needed for other events @classmethod def create(cls, customer_id: str, items: Dict[str, int], total_amount: float): order_id = str(uuid.uuid4()) aggregate = cls(order_id) event = OrderCreated(order_id, customer_id, items, total_amount) aggregate.apply(event) aggregate._uncommitted_events.append(event) return aggregate def adjust_item_quantity(self, item_name: str, new_quantity: int): if self.status != "CREATED": raise ValueError("Cannot adjust items for an order that is not in 'CREATED' status.") event = OrderItemQuantityAdjusted(self.id, item_name, new_quantity) self.apply(event) self._uncommitted_events.append(event) def ship_order(self): if self.status != "CREATED": raise ValueError("Only 'CREATED' orders can be shipped.") event = OrderShipped(self.id) self.apply(event) self._uncommitted_events.append(event) def get_uncommitted_events(self) -> List[DomainEvent]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # --- 3. Befehle (Anfragen zur Zustandsänderung) --- class Command: pass class CreateOrderCommand(Command): def __init__(self, customer_id: str, items: Dict[str, int], total_amount: float): self.customer_id = customer_id self.items = items self.total_amount = total_amount class AdjustOrderItemQuantityCommand(Command): def __init__(self, order_id: str, item_name: str, new_quantity: int): self.order_id = order_id self.item_name = item_name self.new_quantity = new_quantity class ShipOrderCommand(Command): def __init__(self, order_id: str): self.order_id = order_id # --- 4. Befehlshandler (Verarbeiten Befehle, erzeugen Ereignisse) --- class CommandHandler(abc.ABC): @abc.abstractmethod def handle(self, command: Command) -> None: pass # Fiktiver Event Store und Event Bus zur Demonstration class EventStore: def __init__(self): self.events: Dict[str, List[DomainEvent]] = {} def save_events(self, aggregate_id: str, new_events: List[DomainEvent]): if aggregate_id not in self.events: self.events[aggregate_id] = [] self.events[aggregate_id].extend(new_events) print(f"Saved {len(new_events)} events for aggregate {aggregate_id}. Total: {len(self.events[aggregate_id])}") def get_events_for_aggregate(self, aggregate_id: str) -> List[DomainEvent]: return self.events.get(aggregate_id, []) class EventBus: def __init__(self): self._handlers: Dict[Type[DomainEvent], List[callable]] = {} def subscribe(self, event_type: Type[DomainEvent], handler: callable): if event_type not in self._handlers: self._handlers[event_type] = [] self._handlers[event_type].append(handler) def publish(self, event: DomainEvent): for handler in self._handlers.get(type(event), []): handler(event) print(f"Published event: {type(event).__name__} for aggregate {event.aggregate_id}") class CreateOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: CreateOrderCommand): order = OrderAggregate.create(command.customer_id, command.items, command.total_amount) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() return order.id class AdjustOrderItemQuantityCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: AdjustOrderItemQuantityCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # Reconstruct state order.adjust_item_quantity(command.item_name, command.new_quantity) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() class ShipOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: ShipOrderCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # Reconstruct state order.ship_order() self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() # --- 5. Read Model (Denormalisierte Projektion, optimiert für Abfragen) --- class OrderReadModel: def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float, status: str, created_at: datetime): self.id = order_id self.customer_id = customer_id self.items = items self.total_amount = total_amount self.status = status self.created_at = created_at # Fiktive Read Model-Datenbank class OrderReadModelDatabase: def __init__(self): self.orders: Dict[str, OrderReadModel] = {} def save(self, read_model: OrderReadModel): self.orders[read_model.id] = read_model print(f"Read model updated for order {read_model.id} (Status: {read_model.status})") def get_by_id(self, order_id: str) -> Optional[OrderReadModel]: return self.orders.get(order_id) def get_all_pending_orders(self) -> List[OrderReadModel]: return [order for order in self.orders.values() if order.status == "CREATED"] # --- 6. Read Model Projectors (Aktualisieren Read Model aus Ereignissen) --- class OrderReadModelProjector: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle_order_created(self, event: OrderCreated): read_model = OrderReadModel( order_id=event.aggregate_id, customer_id=event.customer_id, items=event.items, total_amount=event.total_amount, status="CREATED", # Initial status created_at=event.timestamp ) self.read_model_db.save(read_model) def handle_order_item_quantity_adjusted(self, event: OrderItemQuantityAdjusted): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.items[event.item_name] = event.new_quantity # For simplicity, recalculate total_amount in a real system self.read_model_db.save(existing_read_model) def handle_order_shipped(self, event: OrderShipped): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.status = "SHIPPED" self.read_model_db.save(existing_read_model) # --- 7. Abfragen (Daten aus Read Model abrufen) --- class GetOrderByIdQuery: def __init__(self, order_id: str): self.order_id = order_id class GetPendingOrdersQuery: pass class GetOrderByIdQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetOrderByIdQuery) -> Optional[OrderReadModel]: return self.read_model_db.get_by_id(query.order_id) class GetPendingOrdersQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetPendingOrdersQuery) -> List[OrderReadModel]: return self.read_model_db.get_all_pending_orders() # --- Orchestrierung --- class Application: def __init__(self): self.event_store = EventStore() self.event_bus = EventBus() self.read_model_db = OrderReadModelDatabase() # Befehlshandler registrieren self.create_order_cmd_handler = CreateOrderCommandHandler(self.event_store, self.event_bus) self.adjust_item_cmd_handler = AdjustOrderItemQuantityCommandHandler(self.event_store, self.event_bus) self.ship_order_cmd_handler = ShipOrderCommandHandler(self.event_store, self.event_bus) # Anfragehandler registrieren self.get_order_by_id_query_handler = GetOrderByIdQueryHandler(self.read_model_db) self.get_pending_orders_query_handler = GetPendingOrdersQueryHandler(self.read_model_db) # Ereignishandler (Projektoren) registrieren, um das Read Model neu zu erstellen self.order_projector = OrderReadModelProjector(self.read_model_db) self.event_bus.subscribe(OrderCreated, self.order_projector.handle_order_created) self.event_bus.subscribe(OrderItemQuantityAdjusted, self.order_projector.handle_order_item_quantity_adjusted) self.event_bus.subscribe(OrderShipped, self.order_projector.handle_order_shipped) def execute_command(self, command: Command): if isinstance(command, CreateOrderCommand): return self.create_order_cmd_handler.handle(command) elif isinstance(command, AdjustOrderItemQuantityCommand): self.adjust_item_cmd_handler.handle(command) elif isinstance(command, ShipOrderCommand): self.ship_order_cmd_handler.handle(command) else: raise ValueError(f"Unknown command: {type(command)}") def execute_query(self, query): if isinstance(query, GetOrderByIdQuery): return self.get_order_by_id_query_handler.handle(query) elif isinstance(query, GetPendingOrdersQuery): return self.get_pending_orders_query_handler.handle(query) else: raise ValueError(f"Unknown query: {type(query)}") # --- Client-Nutzung --- if __name__ == "__main__": app = Application() # --- Befehlsseite --- print("--- Executing Commands ---") create_order_cmd = CreateOrderCommand( customer_id="cust-123", items={"Laptop": 1, "Mouse": 1}, total_amount=1200.00 ) order_id = app.execute_command(create_order_cmd) print(f"New Order Created with ID: {order_id}") adjust_item_cmd = AdjustOrderItemQuantityCommand( order_id=order_id, item_name="Mouse", new_quantity=2 ) app.execute_command(adjust_item_cmd) ship_order_cmd = ShipOrderCommand(order_id=order_id) app.execute_command(ship_order_cmd) # --- Abfrageseite --- print("\n--- Executing Queries ---") # Abfrage der spezifischen Bestellung query_order_by_id = GetOrderByIdQuery(order_id=order_id) order_details = app.execute_query(query_order_by_id) if order_details: print(f"Query Result for Order {order_details.id}:") print(f" Customer: {order_details.customer_id}") print(f" Items: {order_details.items}") print(f" Total: {order_details.total_amount}") print(f" Status: {order_details.status}") else: print(f"Order {order_id} not found in read model.") # Eine weitere Bestellung erstellen, um ausstehende Bestellungen zu demonstrieren new_order_id = app.execute_command(CreateOrderCommand("cust-456", {"Book": 3}, 75.00)) print(f"Another Order Created with ID: {new_order_id}") # Abfrage nach ausstehenden Bestellungen pending_orders = app.execute_query(GetPendingOrdersQuery()) print("\nPending Orders:") for order in pending_orders: print(f" Order ID: {order.id}, Customer: {order.customer_id}, Status: {order.status}")
Erläuterung des Codebeispiels:
DomainEvent
s: Repräsentieren unveränderliche Fakten darüber, was im System passiert ist (z. B.OrderCreated
,OrderItemQuantityAdjusted
).OrderAggregate
: Das Herzstück des Schreibmodells. Es ist verantwortlich für die Durchsetzung von Geschäftsregeln und die Erzeugung vonDomainEvent
s basierend auf Befehlen. Es ändert seinen Zustand nicht direkt, sondernwendet
Ereignisse an, um seinen Zustand wiederherzustellen. Es enthält eine Liste von_uncommitted_events
.Command
s: Einfache Datenstrukturen, die eine Absicht ausdrücken (z. B.CreateOrderCommand
,ShipOrderCommand
).CommandHandler
s: Nehmen einen Befehl entgegen, laden das Aggregat (stellen dessen Zustand aus vergangenen Ereignissen imEventStore
wieder her), führen die erforderliche Aktion für das Aggregat durch und speichern dann die neu erzeugten Ereignisse imEventStore
und veröffentlichen sie auf demEventBus
.EventStore
: Ein vereinfachter In-Memory-Speicher für Ereignisse. In einer echten Anwendung wäre dies eine persistente Datenbank (wie PostgreSQL, DynamoDB oder ein dedizierter Event Store).EventBus
: Ein einfacher Dispatcher, der Ereignisse an registrierte Handler veröffentlicht. In einem Produktionssystem könnte dies Kafka, RabbitMQ oder AWS SNS/SQS sein.OrderReadModel
: Eine denormalisierte, flache Projektion der Bestelldaten, optimiert für Anzeige und Abfragen.OrderReadModelDatabase
: Eine vereinfachte In-Memory-Datenbank für das Read Model. Dies wäre typischerweise eine NoSQL-Datenbank (MongoDB, Elasticsearch), eine stark denormalisierte relationale Tabelle oder ein Suchindex.OrderReadModelProjector
: VerbrauchtDomainEvent
s vomEventBus
und aktualisiert dieOrderReadModelDatabase
. Hier geschieht die Denormalisierung und die Optimierung des Read Models.Query
s: Einfache Datenstrukturen, die eine Informationsanfrage darstellen (z. B.GetOrderByIdQuery
).QueryHandler
s: Rufen Daten direkt aus derOrderReadModelDatabase
ab, ohne komplexe Geschäftslogik.
Dieses Setup trennt die Anliegen klar: Das OrderAggregate
ist allein verantwortlich für die Gewährleistung von Geschäftsregeln und Konsistenz während Schreibvorgängen, während ReadModelProjectors
unabhängig optimierte Ansichten für Abfragen erstellen.
Anwendungsfälle
CQRS ist keine Universallösung für alle Probleme, aber es glänzt in spezifischen komplexen Szenarien:
- Hochleistungsfähige Lese/Schreib-Systeme: Systeme mit signifikant unterschiedlichen Lese- und Schreiblasten, bei denen die unabhängige Skalierung jedes einzelnen entscheidend ist (z. B. Social-Media-Feeds, E-Commerce-Plattformen, IoT-Datenverarbeitung).
- Komplexe Domänenlogik: Wenn das Schreibmodell anspruchsvolle Geschäftsregeln und Integritätsprüfungen erfordert (oft gekoppelt mit Event Sourcing).
- Datenberichterstattung und Analytik: Erstellung spezialisierter Read Models, die für komplexe Berichtsabfragen optimiert sind, ohne die transaktionale Leistung zu beeinträchtigen.
- Integration mit externen Systemen: Bereitstellung maßgeschneiderter Read Models für verschiedene Konsumenten oder Integration mit Analysewerkzeugen von Drittanbietern.
- Ereignisgesteuerte Architekturen: Passt natürlich zu Microservices und ereignisgesteuerten Mustern, bei denen Dienste über Ereignisse kommunizieren.
- Auditing und Debugging: Event Sourcing, wenn es mit CQRS verwendet wird, bietet eine vollständige, zeitliche Aufzeichnung aller Änderungen, was für die Überprüfung, das Debugging und die Reproduktion vergangener Zustände von unschätzbarem Wert ist.
Fazit
Die Anwendung von Command Query Responsibility Segregation (CQRS) in Python bietet ein leistungsstarkes Architekturmuster zum Aufbau komplexer, skalierbarer und wartbarer Geschäftssysteme. Indem wir die Modelle, die für die Änderung von Daten verantwortlich sind, explizit von denen trennen, die für die Abfrage verantwortlich sind, eröffnen wir Möglichkeiten für unabhängige Optimierung, verbesserte Leistung und erhöhte Widerstandsfähigkeit. Obwohl es ein gewisses Maß an Komplexität einführt, machen die Vorteile in spezifischen Hochlast-Szenarien – von verbesserter Skalierbarkeit bis hin zu klarerer Domänenmodellierung – CQRS zu einem unverzichtbaren Werkzeug im Werkzeugkasten moderner Entwickler. Nutzen Sie CQRS, um Geschäftsanwendungen zu erstellen, die nicht nur leistungsfähig sind, sondern auch in der Lage sind, sich mit sich ständig ändernden Anforderungen elegant weiterzuentwickeln.