Django ORM의 annotate 및 aggregate를 이용한 고급 데이터 집계 잠금 해제
Min-jun Kim
Dev Intern · Leapcell

소개
웹 개발, 특히 데이터 기반 애플리케이션의 세계에서는 방대한 데이터 세트에서 의미 있는 통찰력을 추출하는 능력이 가장 중요합니다. 기본적인 필터링과 정렬만으로도 충분할 때가 많지만, 실제 분석 요구 사항은 종종 더 정교한 데이터 변환, 즉 발생 횟수 계산, 평균 계산, 최대값 찾기, 특정 기준에 따른 결과 그룹화를 자주 요구합니다. 복잡한 SQL 쿼리를 직접 작성하는 것은 번거롭고 오류가 발생하기 쉬우며 ORM이 제공하는 우아한 추상화를 자주 방해합니다. Django ORM의 annotate 및 aggregate 함수는 바로 여기서 복잡한 데이터 집계 쿼리를 Pythonic하고 강력하게 구성할 수 있는 방법을 제공하며, 이는 효율적인 SQL로 직접 변환됩니다. 이러한 기능을 이해하고 활용하면 애플리케이션의 분석 능력을 크게 향상시켜 놀라운 용이성으로 더 풍부한 대시보드, 보고 도구 및 데이터 기반 기능을 구축할 수 있습니다. 이 블로그 게시물은 annotate 및 aggregate의 복잡성을 안내하여 복잡한 데이터 작업을 위한 잠재력을 최대한 발휘하는 방법을 보여줍니다.
고급 데이터 집계를 위한 핵심 개념
실제 예제에 들어가기 전에 Django ORM으로 고급 데이터 집계를 마스터하는 데 중요한 핵심 개념을 명확하게 이해해 보겠습니다.
ORM (객체 관계형 매퍼): ORM은 객체 지향 프로그래밍 언어를 사용하여 호환되지 않는 유형 시스템 간의 데이터를 변환하는 프로그래밍 기법입니다. Django에서 ORM을 사용하면 Python 객체를 사용하여 데이터베이스와 상호 작용할 수 있으므로 (대부분의 작업에 대해) 원시 SQL을 작성할 필요가 없습니다.
QuerySet: Django QuerySet은 데이터베이스 쿼리 모음을 나타냅니다. 지연 평가되므로 QuerySet이 실제로 반복되거나 평가될 때 (예: 목록으로 변환하거나 항목에 액세스하려고 할 때) 데이터베이스 히트가 발생합니다.
aggregate(): 이 함수는 전체 QuerySet에 걸쳐 집계된 값 (예: 총 개수, 평균, 합계)의 사전으로 반환됩니다. 이는 QuerySet을 단일 결과 (또는 여러 집계가 수행된 경우 단일 결과 세트)로 축소하는 "최종" 집계를 수행합니다. 동일한 QuerySet 체인 내에서 집계된 값에 대한 추가 작업을 허용하지 않습니다.
annotate(): aggregate()와 달리 annotate()는 QuerySet 내의 각 객체에 집계 값을 추가합니다. QuerySet의 각 항목에 대한 새 필드를 계산하며, 이는 필터링, 정렬 또는 추가 집계에 사용할 수 있습니다. 이는 결과를 그룹화하고 그룹별 계산을 수행하려는 경우 특히 유용합니다.
F() 표현식: F() 표현식을 사용하면 Python 변수가 아닌 데이터베이스 쿼리 내에서 모델 필드를 직접 참조할 수 있습니다. 이를 통해 동일한 모델의 두 가지 다른 필드와 관련된 작업이나 데이터베이스 수준에서 기존 필드 값에 기반한 계산을 수행할 수 있습니다. 예를 들어, start_date와 end_date 간의 차이를 계산합니다.
Q() 객체: Q() 객체는 복잡한 SQL WHERE 절을 캡슐화하는 데 사용됩니다. 논리 연산자 (& for AND, | for OR, ~ for NOT)를 사용하여 쿼리를 구축하고 다양한 조회 조건을 결합하여 필터링을 위한 간단한 키워드 인자보다 훨씬 더 많은 유연성을 제공합니다.
데이터베이스 함수: Django ORM은 (예: Avg, Count, Max, Min, Sum, Concat, TruncDate) 광범위한 기본 제공 데이터베이스 함수를 제공합니다. 이러한 함수는 annotate 및 aggregate와 함께 사용하여 데이터베이스 내에서 다양한 계산을 직접 수행할 수 있습니다. 사용자 지정 데이터베이스 함수도 정의할 수 있습니다.
복잡한 데이터 집계 구현
실제 예제를 통해 이러한 개념을 설명해 보겠습니다. 다음과 같은 단순화된 모델이 있는 전자 상거래 플랫폼을 위한 Django 애플리케이션이 있다고 가정해 보겠습니다.
# models.py from django.db import models from django.db.models import Sum, Count, Avg, F, ExpressionWrapper, DurationField, Q from django.utils import timezone class Customer(models.Model): name = models.CharField(max_length=100) email = models.EmailField(unique=True) registration_date = models.DateTimeField(auto_now_add=True) def __str__(self): return self.name class Product(models.Model): name = models.CharField(max_length=200) price = models.DecimalField(max_digits=10, decimal_places=2) stock = models.IntegerField(default=0) def __str__(self): return self.name class Order(models.Model): customer = models.ForeignKey(Customer, on_delete=models.CASCADE) order_date = models.DateTimeField(auto_now_add=True) is_completed = models.BooleanField(default=False) # A single order can have multiple items def __str__(self): return f"Order {self.id} by {self.customer.name}" class OrderItem(models.Model): order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='items') product = models.ForeignKey(Product, on_delete=models.CASCADE) quantity = models.PositiveIntegerField(default=1) price_at_purchase = models.DecimalField(max_digits=10, decimal_places=2) # Price can change @property def total_item_price(self): return self.quantity * self.price_at_purchase def save(self, *args, **kwargs): if not self.price_at_purchase: self.price_at_purchase = self.product.price super().save(*args, **kwargs) def __str__(self): return f"{self.quantity} x {self.product.name} for Order {self.order.id}"
이제 다양한 집계 시나리오를 살펴보겠습니다.
시나리오 1: aggregate()를 사용한 전역 집계
총 제품 수, 평균 제품 가격 및 완료된 주문의 총 수익을 찾고 싶다고 가정해 보겠습니다.
from django.db.models import Sum, Avg, Count # Total number of products total_products = Product.objects.aggregate(total_count=Count('id')) print(f"Total number of products: {total_products['total_count']}") # Average product price avg_price = Product.objects.aggregate(average_price=Avg('price')) print(f"Average product price: {avg_price['average_price']:.2f}") # Total revenue from all completed orders # We need to sum the total_item_price from OrderItem for completed orders total_revenue = OrderItem.objects.filter(order__is_completed=True) \ .aggregate(total_revenue=Sum(F('quantity') * F('price_at_purchase'))) print(f"Total revenue from completed orders: {total_revenue['total_revenue']:.2f}") # Multiple aggregations in one go product_stats = Product.objects.aggregate( total_products=Count('id'), average_price=Avg('price'), max_price=Max('price'), min_price=Min('price') ) print(f"Product Statistics: {product_stats}")
여기서 aggregate()는 지정된 함수에 따라 전체 데이터 세트 (또는 필터링된 하위 세트)를 요약하는 계산된 값이 포함된 사전으로 반환합니다.
시나리오 2: annotate()를 사용한 객체별 집계
이제 각 고객이 주문한 횟수와 총 지출액을 확인하려고 합니다. 이 작업은 고객별로 그룹화해야 하며, 여기서 annotate()가 빛을 발합니다.
# For each customer, count their orders and calculate their total spending customer_order_stats = Customer.objects.annotate( order_count=Count('order'), total_spent=Sum(F('order__items__quantity') * F('order__items__price_at_purchase')) ).order_by('-total_spent') # Order by customers who spent the most print("\nCustomer Order Statistics:") for customer in customer_order_stats: print(f"Customer: {customer.name}, Orders: {customer.order_count}, Total Spent: {customer.total_spent or 0:.2f}") # Note: `total_spent` might be None if a customer has no orders, hence 'or 0' for formatting.
이 예제에서는 annotate()가 각 Customer 객체에 order_count 및 total_spent를 새 속성으로 추가합니다. 이를 통해 Customer 인스턴스에서 이러한 집계 값에 직접 액세스할 수 있습니다.
시나리오 3: annotate()와 aggregate() 결합
annotate()가 먼저 중간 집계 필드를 생성하고 aggregate()가 이러한 주석 처리된 필드에 대해 최종 집계를 수행하는 더 복잡한 결과를 달성하기 위해 annotate()와 aggregate()를 연결할 수 있습니다.
완료된 주문당 평균 항목 수를 찾아 보겠습니다.
# First, annotate each completed order with its total number of items orders_with_item_counts = Order.objects.filter(is_completed=True).annotate( total_items=Sum('items__quantity') ) # Then, aggregate the average of these total_items across all completed orders average_items_per_completed_order = orders_with_item_counts.aggregate( avg_items=Avg('total_items') ) print(f"\nAverage items per completed order: {average_items_per_completed_order['avg_items'] or 0:.2f}")
여기서 annotate(total_items=Sum('items__quantity'))는 각 완료된 주문에 대한 총 항목 수를 계산합니다. 결과 QuerySet에는 각 Order 객체에 total_items라는 추가 필드가 있습니다. 그런 다음 aggregate(avg_items=Avg('total_items'))는 이러한 주석 처리된 Order 객체 전체에 걸쳐 이러한 total_items의 평균을 계산합니다.
시나리오 4: annotated 값에 대한 필터링 (Q() 및 F() 사용)
annotate()는 후속 필터링 또는 정렬에 사용할 수 있는 새 필드를 생성합니다. F() 표현식은 여러 필드와 관련된 계산을 수행할 때 필수적입니다. Q() 객체를 사용하면 조건부 필터링을 수행할 수 있습니다.
5개 이상의 주문을 했고 총 지출액이 1000달러를 초과하는 고객을 찾아 보겠습니다.
# Find customers with more than 5 orders and total_spent > 1000 high_value_customers = Customer.objects.annotate( order_count=Count('order'), total_spent=Sum(F('order__items__quantity') * F('order__items__price_at_purchase')) ).filter( Q(order_count__gt=5) & Q(total_spent__gt=1000) ).order_by('-total_spent') print("\nHigh-Value Customers:") for customer in high_value_customers: print(f"Customer: {customer.name}, Orders: {customer.order_count}, Total Spent: {customer.total_spent:.2f}")
이 쿼리는 먼저 Customer 객체를 주석 처리한 다음 Q() 객체를 사용하여 논리적 AND를 적용하여 새로 생성된 order_count 및 total_spent 주석을 기반으로 필터를 적용합니다.
시나리오 5: 날짜 기반 집계
특히 날짜와 관련된 Django의 데이터베이스 함수는 annotate()와 함께 사용하면 강력합니다. 월별 판매를 분석해 보겠습니다.
from django.db.models.functions import TruncMonth # Total revenue per month for completed orders monthly_revenue = Order.objects.filter(is_completed=True) \ .annotate(month=TruncMonth('order_date')) \ .values('month') \ .annotate(total_revenue=Sum(F('items__quantity') * F('items__price_at_purchase'))) .order_by('month') print("\nMonthly Revenue from Completed Orders:") for entry in monthly_revenue: print(f"Month: {entry['month'].strftime('%Y-%m')}, Revenue: {entry['total_revenue'] or 0:.2f}")
여기서 TruncMonth('order_date')는 order_date를 월의 시작 부분으로 자르고 주문을 월별로 그룹화합니다. 그런 다음 values('month')는 후속 Sum 집계가 월별로 수행되도록 합니다.
고급 사용 사례: 평균 주문 처리 시간 계산
이 예제를 위해 Order 모델에 completion_date 필드를 추가하고 주문 완료에 걸리는 평균 시간을 계산하려고 한다고 가정해 보겠습니다.
# Add a completion_date to Order model for this example # class Order(models.Model): # ... # completion_date = models.DateTimeField(null=True, blank=True) # For demonstration, assume some orders have completion_date set # For real data, you'd populate this when an order is completed. from django.db.models import ExpressionWrapper, DurationField from datetime import timedelta # Calculate the duration for each completed order orders_with_duration = Order.objects.filter(is_completed=True, completion_date__isnull=False).annotate( processing_duration=ExpressionWrapper( F('completion_date') - F('order_date'), output_field=DurationField() ) ) # Calculate the average duration average_processing_time = orders_with_duration.aggregate( avg_duration=Avg('processing_duration') ) if average_processing_time['avg_duration']: print(f"\nAverage order processing time: {average_processing_time['avg_duration']}") else: print("\nNo completed orders with processing duration available.")
ExpressionWrapper는 출력 유형이 명시적으로 지정된 (여기서는 DurationField) 데이터베이스 표현식을 정의하는 데 사용됩니다. 이를 통해 Django ORM은 데이터베이스 수준에서 날짜 시간 빼기를 올바르게 처리하여 평균화할 수 있는 기간 필드를 생성합니다.
결론
Django ORM의 annotate 및 aggregate 함수는 정교하고 데이터 중심적인 애플리케이션을 구축하는 데 필수적인 도구입니다. 둘의 차이점(annotate는 QuerySet의 각 항목에 필드를 추가하고, aggregate는 전체 QuerySet에 대한 단일 요약 사전 반환)을 이해하고 F() 표현식, Q() 객체 및 데이터베이스 함수와 결합하면 개발자는 Python 코드 내에서 강력하고 효율적인 데이터 집계 쿼리를 만들 수 있습니다. 이는 코드베이스를 깔끔하고 Pythonic하게 유지할 뿐만 아니라 데이터베이스의 기능을 활용하여 최적의 성능을 제공하며, 복잡한 분석 요구 사항을 우아하고 유지 관리 가능한 Django 코드로 변환합니다. 이러한 기능을 마스터하면 데이터에서 심오한 통찰력을 추출하고 더 똑똑하고 반응성이 뛰어난 애플리케이션을 구축할 수 있습니다.

