Django 面試準備 06-1:Celery 基礎概念
深入理解 Celery 異步任務隊列的核心架構與運作原理
目錄
06-1. Celery 基礎概念
Celery 是 Python 生態系中最流行的異步任務隊列,本章將深入探討其核心概念和運作原理。
1. 什麼是 Celery?
定義
Celery 是一個分布式任務隊列系統,用於處理異步任務和定時任務。
# 沒有 Celery 的同步處理
def create_order(request):
order = Order.objects.create(...)
send_email(order) # 阻塞 2 秒
generate_invoice(order) # 阻塞 5 秒
update_inventory(order) # 阻塞 1 秒
return JsonResponse({'order_id': order.id})
# 用戶等待:8 秒 ❌
# 使用 Celery 的異步處理
def create_order(request):
order = Order.objects.create(...)
send_email.delay(order.id) # 立即返回
generate_invoice.delay(order.id) # 立即返回
update_inventory.delay(order.id) # 立即返回
return JsonResponse({'order_id': order.id})
# 用戶等待:< 100ms ✅核心用途
- 異步任務:耗時操作不阻塞用戶請求
- 定時任務:Cron-like 定期執行任務
- 分布式計算:跨多台服務器分散計算負載
2. Celery 架構
核心組件
┌──────────────────────────────────────────────┐
│ Django Application │
│ │
│ create_order() → send_email.delay(...) │ ← 生產者
└──────────────────┬───────────────────────────┘
↓ 發送任務
┌─────────────────────┐
│ Message Broker │ ← 消息隊列
│ (Redis / RabbitMQ) │
└─────────────────────┘
↓ 接收任務
┌─────────────────────┐
│ Celery Workers │ ← 消費者
│ (異步執行任務) │
└─────────────────────┘
↓ 保存結果
┌─────────────────────┐
│ Result Backend │ ← 結果存儲
│ (Redis / Database) │
└─────────────────────┘1. Producer(生產者)
Django 應用,發送任務到隊列。
# views.py
from tasks import send_email
def create_order(request):
# 發送任務到隊列
task = send_email.delay(order_id=123)
# task.id = 'a1b2c3d4-...'
return JsonResponse({'task_id': task.id})2. Message Broker(消息代理)
存儲任務隊列,支持 Redis、RabbitMQ 等。
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Broker 的作用:
# 1. 接收 Django 發送的任務
# 2. 將任務存儲在隊列中
# 3. 分發任務給 Worker
# 4. 保證任務不丟失(持久化)3. Worker(工作進程)
執行任務的進程。
# 啟動 Celery Worker
celery -A myproject worker --loglevel=info
# 輸出:
# [tasks]
# . tasks.send_email
# . tasks.generate_invoice
#
# [2025-02-02 10:00:00] Ready to process tasks4. Result Backend(結果後端)
存儲任務執行結果。
# settings.py
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 查詢任務結果
from celery.result import AsyncResult
result = AsyncResult('a1b2c3d4-...')
print(result.state) # 'SUCCESS'
print(result.result) # 返回值3. 安裝與配置
安裝
# 安裝 Celery
pip install celery
# 安裝 Redis(作為 Broker)
pip install redis
# 或安裝 RabbitMQ(需要先安裝 RabbitMQ 服務)
pip install amqpDjango 配置
步驟 1:創建 Celery 實例
# myproject/celery.py
from celery import Celery
import os
# 設置 Django settings 模組
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 創建 Celery 實例
app = Celery('myproject')
# 從 Django settings 讀取配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自動發現任務
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')步驟 2:在 init.py 中載入
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)步驟 3:Django Settings 配置
# settings.py
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 時區設置
CELERY_TIMEZONE = 'Asia/Taipei'
CELERY_ENABLE_UTC = True
# 任務序列化格式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# 任務結果過期時間(秒)
CELERY_RESULT_EXPIRES = 3600 # 1 小時
# 任務執行時間限制
CELERY_TASK_TIME_LIMIT = 300 # 5 分鐘
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 4 分鐘(軟限制,會拋出異常)步驟 4:創建任務
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(order_id):
"""發送訂單確認郵件"""
from .models import Order
order = Order.objects.get(id=order_id)
send_mail(
subject=f'訂單 {order.id} 已確認',
message=f'您的訂單總金額:${order.total}',
from_email='noreply@example.com',
recipient_list=[order.user.email],
)
return f'Email sent to {order.user.email}'步驟 5:使用任務
# views.py
from .tasks import send_email_task
def create_order(request):
# 創建訂單
order = Order.objects.create(
user=request.user,
total=100
)
# 異步發送郵件
send_email_task.delay(order.id)
return JsonResponse({
'order_id': order.id,
'status': 'created'
})步驟 6:啟動 Worker
# 開發環境
celery -A myproject worker --loglevel=info
# 生產環境(使用 systemd)
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Worker
After=network.target
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
ExecStart=/var/www/myproject/venv/bin/celery -A myproject worker \
--loglevel=info \
--pidfile=/var/run/celery/worker.pid \
--logfile=/var/log/celery/worker.log
[Install]
WantedBy=multi-user.target4. 任務的生命周期
完整流程
# 1. Django 發送任務
task = send_email_task.delay(order_id=123)
# ↓ 任務被序列化成 JSON
# {'task': 'myapp.tasks.send_email_task', 'args': [], 'kwargs': {'order_id': 123}}
# 2. 發送到 Broker(Redis)
# Redis 存儲任務到隊列:
# LPUSH celery '{"task": "myapp.tasks.send_email_task", ...}'
# 3. Worker 從 Broker 接收任務
# Worker 監聽隊列:
# BRPOP celery 0 # 阻塞等待任務
# 4. Worker 執行任務
# 反序列化任務 → 調用函數 → 獲取結果
# 5. 保存結果到 Backend(Redis)
# SET celery-task-meta-a1b2c3d4 '{"status": "SUCCESS", "result": "..."}'
# 6. Django 查詢結果
result = AsyncResult(task.id)
print(result.get(timeout=10)) # 阻塞等待結果任務狀態
from celery import states
# 任務狀態轉換:
# PENDING → STARTED → SUCCESS / FAILURE / RETRY
# 查詢任務狀態
from celery.result import AsyncResult
task_id = 'a1b2c3d4-...'
result = AsyncResult(task_id)
print(result.state) # 任務狀態
# 可能的狀態:
# - PENDING: 等待執行
# - STARTED: 已開始執行
# - SUCCESS: 成功完成
# - FAILURE: 執行失敗
# - RETRY: 正在重試
# - REVOKED: 已撤銷5. 基本使用模式
模式 1:簡單異步任務
# tasks.py
from celery import shared_task
import time
@shared_task
def add(x, y):
"""簡單的加法任務"""
time.sleep(2) # 模擬耗時操作
return x + y
# views.py
def calculate(request):
x = int(request.GET['x'])
y = int(request.GET['y'])
# 異步執行
task = add.delay(x, y)
return JsonResponse({
'task_id': task.id,
'status': 'calculating'
})
# 查詢結果
def get_result(request, task_id):
result = AsyncResult(task_id)
if result.ready():
return JsonResponse({
'status': 'completed',
'result': result.result
})
else:
return JsonResponse({
'status': 'pending'
})模式 2:鏈式任務
# tasks.py
from celery import chain
@shared_task
def download_image(url):
"""下載圖片"""
image_path = download(url)
return image_path
@shared_task
def resize_image(image_path):
"""調整圖片大小"""
resized_path = resize(image_path, size=(800, 600))
return resized_path
@shared_task
def upload_to_s3(image_path):
"""上傳到 S3"""
s3_url = upload(image_path)
return s3_url
# views.py
def process_image(request):
url = request.POST['url']
# 鏈式執行:下載 → 調整大小 → 上傳
workflow = chain(
download_image.s(url),
resize_image.s(),
upload_to_s3.s()
)
result = workflow.apply_async()
return JsonResponse({'task_id': result.id})模式 3:並行任務
# tasks.py
from celery import group
@shared_task
def process_user(user_id):
"""處理單個用戶"""
user = User.objects.get(id=user_id)
# 進行某些處理
return f'Processed user {user.id}'
# views.py
def process_all_users(request):
user_ids = User.objects.values_list('id', flat=True)
# 並行處理所有用戶
job = group(process_user.s(user_id) for user_id in user_ids)
result = job.apply_async()
return JsonResponse({
'task_id': result.id,
'total_users': len(user_ids)
})
# 查詢結果
# result.get() # 等待所有任務完成
# result.ready() # 是否全部完成
# result.successful() # 是否全部成功模式 4:任務重試
# tasks.py
from celery import shared_task
from requests.exceptions import RequestException
@shared_task(bind=True, max_retries=3)
def call_external_api(self, url):
"""呼叫外部 API(自動重試)"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except RequestException as exc:
# 5 秒後重試,最多重試 3 次
raise self.retry(exc=exc, countdown=5)
# views.py
def fetch_data(request):
url = request.GET['url']
task = call_external_api.delay(url)
return JsonResponse({'task_id': task.id})6. Celery Worker 類型
Worker Pool 類型
# 1. prefork(預設,多進程)
celery -A myproject worker --pool=prefork --concurrency=4
# 適合:CPU 密集型任務
# 原理:使用多個進程,繞過 Python GIL
# 2. solo(單進程單線程)
celery -A myproject worker --pool=solo
# 適合:開發環境、簡單任務
# 原理:單個進程執行所有任務
# 3. threads(多線程)
celery -A myproject worker --pool=threads --concurrency=10
# 適合:I/O 密集型任務
# 原理:使用多個線程,受 GIL 限制
# 4. gevent(協程)
celery -A myproject worker --pool=gevent --concurrency=1000
# 適合:大量 I/O 密集型任務
# 原理:使用 greenlet 協程,類似 asyncioWorker 數量計算
# CPU 密集型任務(prefork)
concurrency = CPU 核心數
# I/O 密集型任務(threads)
concurrency = CPU 核心數 × 4-10
# I/O 密集型任務(gevent)
concurrency = 100-1000 # 根據記憶體和任務類型調整7. 常見配置選項
# settings.py
# ============= Broker 配置 =============
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Broker 連接重試
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10
# ============= Result Backend 配置 =============
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 結果過期時間
CELERY_RESULT_EXPIRES = 3600 # 1 小時
# ============= 任務配置 =============
# 任務序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# 任務時間限制
CELERY_TASK_TIME_LIMIT = 300 # 5 分鐘硬限制
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 4 分鐘軟限制
# 任務結果忽略(不保存結果)
CELERY_TASK_IGNORE_RESULT = False
# 任務確認機制
CELERY_TASK_ACKS_LATE = True # 任務執行完才確認(防止任務丟失)
# 任務預取數量
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Worker 一次只取 1 個任務
# ============= Worker 配置 =============
# Worker 最大任務數(之後重啟)
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# Worker 禁用速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = False
# ============= 監控配置 =============
# 啟用任務發送事件
CELERY_TASK_SEND_SENT_EVENT = True
# 啟用任務跟蹤
CELERY_TASK_TRACK_STARTED = True8. 監控與管理
Flower(Web 監控工具)
# 安裝 Flower
pip install flower
# 啟動 Flower
celery -A myproject flower
# 訪問:http://localhost:5555
# 可以看到:
# - Worker 狀態
# - 任務列表(成功、失敗、執行中)
# - 任務詳情
# - 性能圖表命令行工具
# 查看 Worker 狀態
celery -A myproject inspect active
# 查看已註冊的任務
celery -A myproject inspect registered
# 查看任務統計
celery -A myproject inspect stats
# 撤銷任務
celery -A myproject revoke task_id
# 清空隊列
celery -A myproject purge在代碼中監控
# 查詢任務狀態
from celery.result import AsyncResult
result = AsyncResult(task_id)
print(f'Status: {result.state}')
print(f'Result: {result.result}')
print(f'Traceback: {result.traceback}') # 如果失敗
# 檢查任務是否完成
if result.ready():
print('Task completed')
# 等待任務完成(阻塞)
result.get(timeout=10)面試常見問題
Q1:Celery 和多線程/多進程有什麼區別?
答案:
| 特性 | 多線程/多進程 | Celery |
|---|---|---|
| 解耦 | ❌ 與主程序緊密耦合 | ✅ 完全解耦,可獨立部署 |
| 故障恢復 | ❌ 程序崩潰任務丟失 | ✅ 任務持久化,不會丟失 |
| 分布式 | ❌ 只能在單機運行 | ✅ 可分散到多台服務器 |
| 監控 | ❌ 難以監控 | ✅ 有 Flower 等監控工具 |
| 定時任務 | ⚠️ 需要自己實現 | ✅ 內建 Celery Beat |
| 重試機制 | ⚠️ 需要自己實現 | ✅ 內建重試機制 |
示例:
# 多線程:
import threading
def send_email(order_id):
# 發送郵件
pass
thread = threading.Thread(target=send_email, args=(123,))
thread.start()
# 問題:
# - 主程序退出,線程也會結束
# - 無法查詢任務狀態
# - 無法重試失敗的任務
# Celery:
from celery import shared_task
@shared_task
def send_email(order_id):
# 發送郵件
pass
task = send_email.delay(123)
# 優點:
# - 任務持久化,不會丟失
# - 可以查詢狀態:task.state
# - 自動重試:@shared_task(max_retries=3)Q2:Celery 的 Broker 為什麼選擇 Redis 而不是 RabbitMQ?
答案:
| 特性 | Redis | RabbitMQ |
|---|---|---|
| 性能 | ✅ 更快(內存) | ⚠️ 較慢(磁碟持久化) |
| 可靠性 | ⚠️ 較低(可能丟失消息) | ✅ 更高(持久化) |
| 功能 | ⚠️ 基本功能 | ✅ 豐富(優先級、路由等) |
| 易用性 | ✅ 簡單 | ⚠️ 複雜 |
| 內存占用 | ⚠️ 較高 | ✅ 較低 |
選擇建議:
# 使用 Redis:
# ✅ 小型應用(<10,000 任務/天)
# ✅ 任務丟失可以接受
# ✅ 團隊熟悉 Redis
# 使用 RabbitMQ:
# ✅ 大型應用(>100,000 任務/天)
# ✅ 任務絕對不能丟失(金融、支付)
# ✅ 需要複雜的路由和優先級Q3:Celery 任務失敗後如何處理?
答案:
有 3 種處理策略:
1. 自動重試
@shared_task(bind=True, max_retries=3)
def unreliable_task(self, data):
try:
result = call_external_api(data)
return result
except Exception as exc:
# 5 秒後重試
raise self.retry(exc=exc, countdown=5)2. 記錄失敗
from celery.signals import task_failure
@task_failure.connect
def handle_task_failure(sender, task_id, exception, **kwargs):
"""任務失敗時觸發"""
# 記錄到日誌
logger.error(f'Task {task_id} failed: {exception}')
# 發送告警
send_alert(f'Task failed: {sender.name}')
# 保存到資料庫
FailedTask.objects.create(
task_id=task_id,
task_name=sender.name,
exception=str(exception)
)3. 死信隊列(Dead Letter Queue)
# 將失敗的任務移到專門的隊列
@shared_task(bind=True)
def risky_task(self, data):
try:
return process(data)
except Exception as exc:
# 重試 3 次後,移到死信隊列
if self.request.retries >= 3:
dead_letter_queue.apply_async(
args=[self.request.id, data, str(exc)]
)
else:
raise self.retry(exc=exc)
@shared_task
def dead_letter_queue(task_id, data, error):
"""處理失敗任務的隊列"""
# 記錄失敗任務
# 等待人工處理
pass小結
Celery 的核心概念:
- 架構組件:Producer、Broker、Worker、Result Backend
- 任務生命周期:發送 → 隊列 → 執行 → 結果
- 基本使用:
@shared_task裝飾器 +.delay()方法 - Worker 類型:prefork(CPU 密集)、threads/gevent(I/O 密集)
- 監控管理:Flower、命令行工具
關鍵優勢:
- ✅ 異步處理,不阻塞用戶請求
- ✅ 分布式部署,水平擴展
- ✅ 任務持久化,不會丟失
- ✅ 內建重試、監控等功能
下一章將通過實戰案例深入了解 Celery 的應用場景!