Django 面試準備 06-1:Celery 基礎概念

深入理解 Celery 異步任務隊列的核心架構與運作原理

06-1. Celery 基礎概念

Celery 是 Python 生態系中最流行的異步任務隊列,本章將深入探討其核心概念和運作原理。


1. 什麼是 Celery?

定義

Celery 是一個分布式任務隊列系統,用於處理異步任務和定時任務。

# 沒有 Celery 的同步處理
def create_order(request):
    order = Order.objects.create(...)
    send_email(order)  # 阻塞 2 秒
    generate_invoice(order)  # 阻塞 5 秒
    update_inventory(order)  # 阻塞 1 秒
    return JsonResponse({'order_id': order.id})
# 用戶等待:8 秒 ❌

# 使用 Celery 的異步處理
def create_order(request):
    order = Order.objects.create(...)
    send_email.delay(order.id)  # 立即返回
    generate_invoice.delay(order.id)  # 立即返回
    update_inventory.delay(order.id)  # 立即返回
    return JsonResponse({'order_id': order.id})
# 用戶等待:< 100ms ✅

核心用途

  1. 異步任務:耗時操作不阻塞用戶請求
  2. 定時任務:Cron-like 定期執行任務
  3. 分布式計算:跨多台服務器分散計算負載

2. Celery 架構

核心組件

┌──────────────────────────────────────────────┐
│              Django Application              │
│                                              │
│  create_order() → send_email.delay(...)      │ ← 生產者
└──────────────────┬───────────────────────────┘
                   ↓ 發送任務
         ┌─────────────────────┐
         │   Message Broker    │ ← 消息隊列
         │  (Redis / RabbitMQ) │
         └─────────────────────┘
                   ↓ 接收任務
         ┌─────────────────────┐
         │   Celery Workers    │ ← 消費者
         │  (異步執行任務)       │
         └─────────────────────┘
                   ↓ 保存結果
         ┌─────────────────────┐
         │   Result Backend    │ ← 結果存儲
         │  (Redis / Database) │
         └─────────────────────┘

1. Producer(生產者)

Django 應用,發送任務到隊列。

# views.py
from tasks import send_email

def create_order(request):
    # 發送任務到隊列
    task = send_email.delay(order_id=123)
    # task.id = 'a1b2c3d4-...'
    return JsonResponse({'task_id': task.id})

2. Message Broker(消息代理)

存儲任務隊列,支持 Redis、RabbitMQ 等。

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Broker 的作用:
# 1. 接收 Django 發送的任務
# 2. 將任務存儲在隊列中
# 3. 分發任務給 Worker
# 4. 保證任務不丟失(持久化)

3. Worker(工作進程)

執行任務的進程。

# 啟動 Celery Worker
celery -A myproject worker --loglevel=info

# 輸出:
# [tasks]
#   . tasks.send_email
#   . tasks.generate_invoice
#
# [2025-02-02 10:00:00] Ready to process tasks

4. Result Backend(結果後端)

存儲任務執行結果。

# settings.py
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# 查詢任務結果
from celery.result import AsyncResult

result = AsyncResult('a1b2c3d4-...')
print(result.state)  # 'SUCCESS'
print(result.result)  # 返回值

3. 安裝與配置

安裝

# 安裝 Celery
pip install celery

# 安裝 Redis(作為 Broker)
pip install redis

# 或安裝 RabbitMQ(需要先安裝 RabbitMQ 服務)
pip install amqp

Django 配置

步驟 1:創建 Celery 實例

# myproject/celery.py
from celery import Celery
import os

# 設置 Django settings 模組
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# 創建 Celery 實例
app = Celery('myproject')

# 從 Django settings 讀取配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自動發現任務
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

步驟 2:在 init.py 中載入

# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

步驟 3:Django Settings 配置

# settings.py

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# 時區設置
CELERY_TIMEZONE = 'Asia/Taipei'
CELERY_ENABLE_UTC = True

# 任務序列化格式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# 任務結果過期時間(秒)
CELERY_RESULT_EXPIRES = 3600  # 1 小時

# 任務執行時間限制
CELERY_TASK_TIME_LIMIT = 300  # 5 分鐘
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 4 分鐘(軟限制,會拋出異常)

步驟 4:創建任務

# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_email_task(order_id):
    """發送訂單確認郵件"""
    from .models import Order

    order = Order.objects.get(id=order_id)

    send_mail(
        subject=f'訂單 {order.id} 已確認',
        message=f'您的訂單總金額:${order.total}',
        from_email='noreply@example.com',
        recipient_list=[order.user.email],
    )

    return f'Email sent to {order.user.email}'

步驟 5:使用任務

# views.py
from .tasks import send_email_task

def create_order(request):
    # 創建訂單
    order = Order.objects.create(
        user=request.user,
        total=100
    )

    # 異步發送郵件
    send_email_task.delay(order.id)

    return JsonResponse({
        'order_id': order.id,
        'status': 'created'
    })

步驟 6:啟動 Worker

# 開發環境
celery -A myproject worker --loglevel=info

# 生產環境(使用 systemd)
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Worker
After=network.target

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
ExecStart=/var/www/myproject/venv/bin/celery -A myproject worker \
    --loglevel=info \
    --pidfile=/var/run/celery/worker.pid \
    --logfile=/var/log/celery/worker.log

[Install]
WantedBy=multi-user.target

4. 任務的生命周期

完整流程

# 1. Django 發送任務
task = send_email_task.delay(order_id=123)
# ↓ 任務被序列化成 JSON
# {'task': 'myapp.tasks.send_email_task', 'args': [], 'kwargs': {'order_id': 123}}

# 2. 發送到 Broker(Redis)
# Redis 存儲任務到隊列:
# LPUSH celery '{"task": "myapp.tasks.send_email_task", ...}'

# 3. Worker 從 Broker 接收任務
# Worker 監聽隊列:
# BRPOP celery 0  # 阻塞等待任務

# 4. Worker 執行任務
# 反序列化任務 → 調用函數 → 獲取結果

# 5. 保存結果到 Backend(Redis)
# SET celery-task-meta-a1b2c3d4 '{"status": "SUCCESS", "result": "..."}'

# 6. Django 查詢結果
result = AsyncResult(task.id)
print(result.get(timeout=10))  # 阻塞等待結果

任務狀態

from celery import states

# 任務狀態轉換:
# PENDING → STARTED → SUCCESS / FAILURE / RETRY

# 查詢任務狀態
from celery.result import AsyncResult

task_id = 'a1b2c3d4-...'
result = AsyncResult(task_id)

print(result.state)  # 任務狀態

# 可能的狀態:
# - PENDING: 等待執行
# - STARTED: 已開始執行
# - SUCCESS: 成功完成
# - FAILURE: 執行失敗
# - RETRY: 正在重試
# - REVOKED: 已撤銷

5. 基本使用模式

模式 1:簡單異步任務

# tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
    """簡單的加法任務"""
    time.sleep(2)  # 模擬耗時操作
    return x + y

# views.py
def calculate(request):
    x = int(request.GET['x'])
    y = int(request.GET['y'])

    # 異步執行
    task = add.delay(x, y)

    return JsonResponse({
        'task_id': task.id,
        'status': 'calculating'
    })

# 查詢結果
def get_result(request, task_id):
    result = AsyncResult(task_id)

    if result.ready():
        return JsonResponse({
            'status': 'completed',
            'result': result.result
        })
    else:
        return JsonResponse({
            'status': 'pending'
        })

模式 2:鏈式任務

# tasks.py
from celery import chain

@shared_task
def download_image(url):
    """下載圖片"""
    image_path = download(url)
    return image_path

@shared_task
def resize_image(image_path):
    """調整圖片大小"""
    resized_path = resize(image_path, size=(800, 600))
    return resized_path

@shared_task
def upload_to_s3(image_path):
    """上傳到 S3"""
    s3_url = upload(image_path)
    return s3_url

# views.py
def process_image(request):
    url = request.POST['url']

    # 鏈式執行:下載 → 調整大小 → 上傳
    workflow = chain(
        download_image.s(url),
        resize_image.s(),
        upload_to_s3.s()
    )

    result = workflow.apply_async()

    return JsonResponse({'task_id': result.id})

模式 3:並行任務

# tasks.py
from celery import group

@shared_task
def process_user(user_id):
    """處理單個用戶"""
    user = User.objects.get(id=user_id)
    # 進行某些處理
    return f'Processed user {user.id}'

# views.py
def process_all_users(request):
    user_ids = User.objects.values_list('id', flat=True)

    # 並行處理所有用戶
    job = group(process_user.s(user_id) for user_id in user_ids)
    result = job.apply_async()

    return JsonResponse({
        'task_id': result.id,
        'total_users': len(user_ids)
    })

    # 查詢結果
    # result.get()  # 等待所有任務完成
    # result.ready()  # 是否全部完成
    # result.successful()  # 是否全部成功

模式 4:任務重試

# tasks.py
from celery import shared_task
from requests.exceptions import RequestException

@shared_task(bind=True, max_retries=3)
def call_external_api(self, url):
    """呼叫外部 API(自動重試)"""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()

    except RequestException as exc:
        # 5 秒後重試,最多重試 3 次
        raise self.retry(exc=exc, countdown=5)

# views.py
def fetch_data(request):
    url = request.GET['url']
    task = call_external_api.delay(url)
    return JsonResponse({'task_id': task.id})

6. Celery Worker 類型

Worker Pool 類型

# 1. prefork(預設,多進程)
celery -A myproject worker --pool=prefork --concurrency=4

# 適合:CPU 密集型任務
# 原理:使用多個進程,繞過 Python GIL

# 2. solo(單進程單線程)
celery -A myproject worker --pool=solo

# 適合:開發環境、簡單任務
# 原理:單個進程執行所有任務

# 3. threads(多線程)
celery -A myproject worker --pool=threads --concurrency=10

# 適合:I/O 密集型任務
# 原理:使用多個線程,受 GIL 限制

# 4. gevent(協程)
celery -A myproject worker --pool=gevent --concurrency=1000

# 適合:大量 I/O 密集型任務
# 原理:使用 greenlet 協程,類似 asyncio

Worker 數量計算

# CPU 密集型任務(prefork)
concurrency = CPU 核心數

# I/O 密集型任務(threads)
concurrency = CPU 核心數 × 4-10

# I/O 密集型任務(gevent)
concurrency = 100-1000  # 根據記憶體和任務類型調整

7. 常見配置選項

# settings.py

# ============= Broker 配置 =============
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Broker 連接重試
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

# ============= Result Backend 配置 =============
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# 結果過期時間
CELERY_RESULT_EXPIRES = 3600  # 1 小時

# ============= 任務配置 =============
# 任務序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# 任務時間限制
CELERY_TASK_TIME_LIMIT = 300  # 5 分鐘硬限制
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 4 分鐘軟限制

# 任務結果忽略(不保存結果)
CELERY_TASK_IGNORE_RESULT = False

# 任務確認機制
CELERY_TASK_ACKS_LATE = True  # 任務執行完才確認(防止任務丟失)

# 任務預取數量
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # Worker 一次只取 1 個任務

# ============= Worker 配置 =============
# Worker 最大任務數(之後重啟)
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000

# Worker 禁用速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = False

# ============= 監控配置 =============
# 啟用任務發送事件
CELERY_TASK_SEND_SENT_EVENT = True

# 啟用任務跟蹤
CELERY_TASK_TRACK_STARTED = True

8. 監控與管理

Flower(Web 監控工具)

# 安裝 Flower
pip install flower

# 啟動 Flower
celery -A myproject flower

# 訪問:http://localhost:5555
# 可以看到:
# - Worker 狀態
# - 任務列表(成功、失敗、執行中)
# - 任務詳情
# - 性能圖表

命令行工具

# 查看 Worker 狀態
celery -A myproject inspect active

# 查看已註冊的任務
celery -A myproject inspect registered

# 查看任務統計
celery -A myproject inspect stats

# 撤銷任務
celery -A myproject revoke task_id

# 清空隊列
celery -A myproject purge

在代碼中監控

# 查詢任務狀態
from celery.result import AsyncResult

result = AsyncResult(task_id)

print(f'Status: {result.state}')
print(f'Result: {result.result}')
print(f'Traceback: {result.traceback}')  # 如果失敗

# 檢查任務是否完成
if result.ready():
    print('Task completed')

# 等待任務完成(阻塞)
result.get(timeout=10)

面試常見問題

Q1:Celery 和多線程/多進程有什麼區別?

答案:

特性多線程/多進程Celery
解耦❌ 與主程序緊密耦合✅ 完全解耦,可獨立部署
故障恢復❌ 程序崩潰任務丟失✅ 任務持久化,不會丟失
分布式❌ 只能在單機運行✅ 可分散到多台服務器
監控❌ 難以監控✅ 有 Flower 等監控工具
定時任務⚠️ 需要自己實現✅ 內建 Celery Beat
重試機制⚠️ 需要自己實現✅ 內建重試機制

示例:

# 多線程:
import threading

def send_email(order_id):
    # 發送郵件
    pass

thread = threading.Thread(target=send_email, args=(123,))
thread.start()

# 問題:
# - 主程序退出,線程也會結束
# - 無法查詢任務狀態
# - 無法重試失敗的任務

# Celery:
from celery import shared_task

@shared_task
def send_email(order_id):
    # 發送郵件
    pass

task = send_email.delay(123)

# 優點:
# - 任務持久化,不會丟失
# - 可以查詢狀態:task.state
# - 自動重試:@shared_task(max_retries=3)

Q2:Celery 的 Broker 為什麼選擇 Redis 而不是 RabbitMQ?

答案:

特性RedisRabbitMQ
性能✅ 更快(內存)⚠️ 較慢(磁碟持久化)
可靠性⚠️ 較低(可能丟失消息)✅ 更高(持久化)
功能⚠️ 基本功能✅ 豐富(優先級、路由等)
易用性✅ 簡單⚠️ 複雜
內存占用⚠️ 較高✅ 較低

選擇建議:

# 使用 Redis:
# ✅ 小型應用(<10,000 任務/天)
# ✅ 任務丟失可以接受
# ✅ 團隊熟悉 Redis

# 使用 RabbitMQ:
# ✅ 大型應用(>100,000 任務/天)
# ✅ 任務絕對不能丟失(金融、支付)
# ✅ 需要複雜的路由和優先級

Q3:Celery 任務失敗後如何處理?

答案:

有 3 種處理策略:

1. 自動重試

@shared_task(bind=True, max_retries=3)
def unreliable_task(self, data):
    try:
        result = call_external_api(data)
        return result
    except Exception as exc:
        # 5 秒後重試
        raise self.retry(exc=exc, countdown=5)

2. 記錄失敗

from celery.signals import task_failure

@task_failure.connect
def handle_task_failure(sender, task_id, exception, **kwargs):
    """任務失敗時觸發"""
    # 記錄到日誌
    logger.error(f'Task {task_id} failed: {exception}')

    # 發送告警
    send_alert(f'Task failed: {sender.name}')

    # 保存到資料庫
    FailedTask.objects.create(
        task_id=task_id,
        task_name=sender.name,
        exception=str(exception)
    )

3. 死信隊列(Dead Letter Queue)

# 將失敗的任務移到專門的隊列
@shared_task(bind=True)
def risky_task(self, data):
    try:
        return process(data)
    except Exception as exc:
        # 重試 3 次後,移到死信隊列
        if self.request.retries >= 3:
            dead_letter_queue.apply_async(
                args=[self.request.id, data, str(exc)]
            )
        else:
            raise self.retry(exc=exc)

@shared_task
def dead_letter_queue(task_id, data, error):
    """處理失敗任務的隊列"""
    # 記錄失敗任務
    # 等待人工處理
    pass

小結

Celery 的核心概念:

  1. 架構組件:Producer、Broker、Worker、Result Backend
  2. 任務生命周期:發送 → 隊列 → 執行 → 結果
  3. 基本使用@shared_task 裝飾器 + .delay() 方法
  4. Worker 類型:prefork(CPU 密集)、threads/gevent(I/O 密集)
  5. 監控管理:Flower、命令行工具

關鍵優勢:

  • ✅ 異步處理,不阻塞用戶請求
  • ✅ 分布式部署,水平擴展
  • ✅ 任務持久化,不會丟失
  • ✅ 內建重試、監控等功能

下一章將通過實戰案例深入了解 Celery 的應用場景!

0%