DjangoおよびFlaskでのCeleryを使用した非同期タスク処理
Olivia Novak
Dev Intern · Leapcell

はじめに
Web開発の世界では、ユーザーエクスペリエンスが最重要です。読み込みの遅いページや応答性の低いアプリケーションは、ユーザーの不満や離脱を招きかねません。多くの場合、アプリケーションでは時間のかかるタスクを実行する必要があります。画像処理、大量のメール送信、複雑なレポートの生成、外部APIとの連携などが考えられます。これらの操作がWebリクエストの一部として同期的に実行されると、アプリケーションのメインスレッドをブロックし、大幅な遅延と貧弱なユーザーエクスペリエンスを引き起こす可能性があります。そこで非同期タスクキューが登場し、これらの重いタスクをメインのリクエスト・レスポンスサイクルからオフロードできるようになります。この記事では、人気のあるPython WebフレームワークであるDjangoとFlaskにCelery、強力な分散タスクキューを統合して、そのような困難な操作をエレガントに処理し、アプリケーションの応答性を維持し、全体的なパフォーマンスを向上させる方法を探ります。
非同期操作のコアコンセプト
統合の詳細に入る前に、Celeryを使用した非同期タスクシステムの構築に関わる主要なコンポーネントと概念を共通理解として確立しましょう。
Celery: Celeryの心臓部は、分散メッセージパッシングに基づいた非同期タスクキュー/ジョブキューです。大量のメッセージを処理するように設計されており、操作に(ほぼリアルタイムの)リアルタイムコンポーネントを提供します。
Broker: Celeryは、メッセージを送受信するためのメッセージブローカーを必要とします。ブローカーは仲介者として機能し、実行する必要のあるタスクを格納し、それらをワーカーに配信します。一般的な選択肢には、RabbitMQ、Redis、Amazon SQSがあります。
Worker: Celeryワーカーは、メッセージブローカーで新しいタスクを継続的に監視する別のプロセスです。タスクを取得すると、ワーカーはそれを実行し、オプションで結果を格納します。タスク処理機能をスケーリングするために、複数のワーカーを実行できます。
Task: Celeryでは、タスクはワーカーによって非同期に実行される呼び出し可能なPython関数です。これらの関数は通常、アプリケーションで定義され、Celeryタスクとして認識されるようにデコレートされます。
Result Backend (オプション): タスクが完了した後、その結果またはステータスを取得したい場合があります。結果バックエンドは、この情報を格納します。一般的な選択肢には、Redis、SQLAlchemy、Django ORM、Memcachedがあります。
DjangoでのCeleryの実装
CeleryをDjangoプロジェクトに統合するには、いくつかの簡単な手順で、設定、タスクの定義、および必要なコンポーネントの実行を行います。
1. インストール
まず、Celeryと選択したブローカーをインストールします。この例では、Redisをブローカーおよび結果バックエンドとして使用します。
pip install celery redis
2. Djangoプロジェクトの設定
DjangoプロジェクトにCelery
インスタンスを作成します。通常、これはproj_name/celery.py
ファイルで行います。
# proj_name/celery.py import os from celery import Celery # 'celery'プログラムのデフォルトのDjango設定モジュールを設定する。 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings') app = Celery('proj_name') # ここで文字列を使用することは、ワーカーが設定オブジェクトを子プロセスにシリアル化する必要がないことを意味します。 # - namespace='CELERY' は、すべてのcelery関連の設定キーに `CELERY_` プレフィックスが付くことを意味します。 app.config_from_object('django.conf:settings', namespace='CELERY') # 登録済みのすべてのDjangoアプリ設定からタスクモジュールをロードする。 app.autodiscover_tasks() @app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')
次に、Djangoが起動したときにロードされるように、プロジェクトの__init__.py
でこのCeleryアプリをインポートします。
# proj_name/__init__.py from .celery import app as celery_app
3. settings.py
でのCelery設定
Djangoプロジェクトのsettings.py
にCeleryの設定を追加します。
# proj_name/settings.py # ...既存の設定... CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # または希望するタイムゾーン
4. タスクの定義
非同期タスクを定義するために、Djangoアプリのいずれか(例:my_app/tasks.py
)にtasks.py
ファイルを作成します。
# my_app/tasks.py import time from celery import shared_task @shared_task def send_marketing_emails(user_ids): print(f"Starting to send emails to {len(user_ids)} users...") for user_id in user_ids: # 時間のかかるメール送信プロセスをシミュレートする time.sleep(2) print(f"Email sent to user {user_id}") return f"Finished sending emails to {len(user_ids)} users." @shared_task def generate_report(report_id): print(f"Generating report {report_id}...") time.sleep(10) # 長いレポート生成をシミュレートする print(f"Report {report_id} generated successfully!") return {"status": "completed", "report_id": report_id}
5. タスクの呼び出し
これらのタスクは、Djangoビューまたはアプリケーションの他の場所から呼び出すことができます。
# my_app/views.py from django.http import HttpResponse from .tasks import send_marketing_emails, generate_report def email_sending_view(request): user_ids_to_email = [1, 2, 3, 4, 5] # .delay() は .apply_async() のショートカットです send_marketing_emails.delay(user_ids_to_email) return HttpResponse("Email sending initiated. Check logs for progress.") def report_generation_view(request): report_task = generate_report.delay("monthly_sales") # 後でステータスを確認するためにタスクIDを取得できます return HttpResponse(f"Report generation initiated. Task ID: {report_task.id}") def check_report_status(request, task_id): from celery.result import AsyncResult result = AsyncResult(task_id) return HttpResponse(f"Task ID: {task_id}, Status: {result.status}, Result: {result.result}")
6. Celeryワーカーとブローカーの実行
実行中のRedisサーバー(または選択したブローカー)とCeleryワーカーが必要です。
Redisを起動する:
redis-server
DjangoプロジェクトのルートからCeleryワーカーを起動する:
celery -A proj_name worker -l info
-A proj_name
はDjangoプロジェクトを指定します。-l info
はログレベルを設定します。
CeleryとFlaskの統合
Flaskの場合も同様のプロセスで、アプリケーションコンテキストと設定に焦点を当てます。
1. インストール
pip install celery redis Flask
2. Flaskアプリケーションの設定
celery_app.py
を作成するか、app.py
に直接統合します。一般的なパターンは、Celeryアプリケーションファクトリを作成することです。
# app.py from flask import Flask, jsonify from celery import Celery import time def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery flask_app = Flask(__name__) flask_app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0', CELERY_ACCEPT_CONTENT = ['json'], CELERY_TASK_SERIALIZER = 'json', CELERY_RESULT_SERIALIZER = 'json', CELERY_TIMEZONE = 'Asia/Shanghai' ) celery_app = make_celery(flask_app) # タスクを定義する @celery_app.task def long_running_task(x, y): print(f"Starting long running task with {x} and {y}...") time.sleep(5) result = x + y print(f"Task completed. Result: {result}") return result @celery_app.task(bind=True) def background_download(self, url): print(f"Downloading from {url}...") time.sleep(7) # ダウンロードの進捗をシミュレートする for i in range(1, 11): self.update_state(state='PROGRESS', meta={'current': i * 10, 'total': 100}) time.sleep(0.5) print(f"Finished downloading from {url}") return {"status": "success", "url": url} # Flask ルート @flask_app.route('/') def index(): return "Hello, Flask with Celery!" @flask_app.route('/start_task/<int:num1>/<int:num2>') def start_task(num1, num2): task = long_running_task.delay(num1, num2) return jsonify({"message": "Task started", "task_id": task.id}) @flask_app.route('/download/<path:url>') def start_download(url): task = background_download.delay(url) return jsonify({"message": "Download initiated", "task_id": task.id}) @flask_app.route('/check_task/<task_id>') def check_task(task_id): from celery.result import AsyncResult result = AsyncResult(task_id, app=celery_app) if result.state == 'PENDING': response = {'state': result.state, 'status': 'Pending...'} elif result.state == 'PROGRESS': response = {'state': result.state, 'status': 'Running...', 'data': result.info.get('current', 0)} elif result.state == 'SUCCESS': response = {'state': result.state, 'status': 'Completed', 'result': result.result} else: response = {'state': result.state, 'status': str(result.info)} # 何か問題が発生した return jsonify(response) if __name__ == '__main__': flask_app.run(debug=True)
3. Celeryワーカーとブローカーの実行
Redisを起動する:
redis-server
Celeryワーカーは通常、app.py
を含むディレクトリから起動します:
celery -A app.celery_app worker -l info
ここでは、-A app.celery_app
はapp.py
内の'celery_app'という名前のCeleryアプリインスタンスを指します。
アプリケーションシナリオ
Celeryは、非同期処理が有益な数多くのシナリオに適用できます。
- メール送信: ウェルカムメール、ニュースレター、パスワードリセットリンク、通知の送信。
- 画像/ビデオ処理: サムネイル生成、リサイジング、透かし、ビデオエンコーディング。
- レポート生成: コンパイルに時間がかかる複雑なPDF、CSV、Excelレポートの作成。
- データインポート/エクスポート: 処理のための大きなファイルのアップロードを処理したり、大きなデータダンプを生成したりします。
- API統合: 低速またはレート制限されている可能性のあるサードパーティAPIへのリクエストの作成。
- スケジュールされたタスク: Celery Beatを使用して、定期的な間隔でタスクを実行します(例:毎日のデータバックアップ、毎週の統計コンパイル)。
これらのタスクをオフロードすることにより、Webサーバーはユーザーリクエストへの迅速な応答に集中でき、はるかにスムーズで楽しいユーザーエクスペリエンスにつながります。
結論
CeleryをDjangoまたはFlaskアプリケーションに統合することは、時間のかかる操作を非同期に処理するための強力なパターンです。これらのタスクをメインのリクエスト・レスポンスサイクルから分離することにより、アプリケーションの応答性、スケーラビリティ、および全体的なユーザーエクスペリエンスを大幅に向上させます。セットアップは簡単で、アプリケーションのパフォーマンスと信頼性におけるメリットは大きく、堅牢なWeb開発に不可欠なツールとなっています。困難なタスクをオフロードすることは、流動的で応答性の高いユーザーインターフェイスを維持するための鍵となります。