03-3. 案例:混合型應用

既有 I/O 又有 CPU 計算的應用如何配置

🎯 本章重點

學習如何為既有 I/O 操作又有 CPU 計算的混合型應用選擇最佳配置。

📊 什麼是混合型應用?

定義

混合型應用:既有大量 I/O 等待,又有相當比例的 CPU 計算。

一個請求的時間分配(混合型):

總時間:1000ms
├── I/O 等待:600ms (60%)      ← 主要部分
│   ├── 資料庫查詢:300ms
│   ├── Redis 讀取:100ms
│   └── API 調用:200ms
│
└── CPU 計算:400ms (40%)      ← 不可忽視
    ├── 數據處理:150ms
    ├── JSON 序列化:100ms
    ├── 業務邏輯計算:100ms
    └── 模板渲染:50ms

與純 I/O、純 CPU 的對比:

純 I/O 密集型:
├── I/O 等待:95%
└── CPU 計算:5%
→ 使用 ASGI (Uvicorn Worker)

純 CPU 密集型:
├── CPU 計算:95%
└── I/O 等待:5%
→ 使用 Sync Worker

混合型:
├── I/O 等待:50-70%
└── CPU 計算:30-50%
→ 需要權衡選擇!

典型特徵

# 混合型應用的特徵

 既有 I/O 操作
   - 查詢資料庫中等複雜度
   - 調用外部 API少量
   - 讀寫 Cache

 也有 CPU 計算
   - 數據處理和轉換
   - 複雜業務邏輯
   - JSON 序列化大量數據
   - 模板渲染
   - 輕量級圖片處理

 時間分配特徵
   - I/O 等待50-70%
   - CPU 計算30-50%
   - 兩者都不可忽視

 不是混合型
   - 只查資料庫返回  純 I/O
   - 只做複雜計算  純 CPU

🏗️ 典型場景

場景 1:電商商品列表頁

# myapp/views.py
from django.db.models import Q, Prefetch
from django.core.cache import cache
from django.http import JsonResponse
import json

def product_list_view(request):
    """
    商品列表頁 - 混合型應用

    時間分配:
    1. 查詢商品(資料庫):200ms (I/O)
    2. 查詢庫存(Redis):50ms (I/O)
    3. 計算折扣價格:100ms (CPU)
    4. 數據過濾和排序:80ms (CPU)
    5. JSON 序列化(1000 商品):120ms (CPU)
    6. 查詢推薦商品(API):150ms (I/O)

    總時間:700ms
    I/O 時間:400ms (57%)
    CPU 時間:300ms (43%)
    """
    # I/O:查詢商品
    products = Product.objects.select_related(
        'category', 'brand'
    ).prefetch_related(
        'images', 'reviews'
    ).filter(
        is_active=True
    )[:1000]

    # I/O:批量查詢庫存
    product_ids = [p.id for p in products]
    stock_data = cache.get_many([f'stock:{pid}' for pid in product_ids])

    # CPU:計算折扣價格(業務邏輯)
    result = []
    for product in products:
        # CPU 密集型:計算折扣
        discount = calculate_discount(product)
        final_price = product.price * (1 - discount)

        # CPU 密集型:數據轉換
        result.append({
            'id': product.id,
            'name': product.name,
            'price': float(product.price),
            'final_price': float(final_price),
            'discount': float(discount),
            'stock': stock_data.get(f'stock:{product.id}', 0),
            'category': product.category.name,
            'brand': product.brand.name,
            'images': [img.url for img in product.images.all()],
            'avg_rating': calculate_average_rating(product.reviews.all()),
        })

    # CPU:數據過濾和排序
    result = [p for p in result if p['stock'] > 0]
    result = sorted(result, key=lambda x: x['final_price'])

    # I/O:獲取推薦商品(外部 API)
    recommendations = fetch_recommendations(request.user.id)

    # CPU:JSON 序列化(1000 個商品)
    return JsonResponse({
        'products': result,
        'recommendations': recommendations,
        'total': len(result),
    })


def calculate_discount(product):
    """CPU 密集型:複雜的折扣計算"""
    base_discount = 0.1

    # VIP 折扣
    if product.is_vip_product:
        base_discount += 0.05

    # 季節性折扣
    from datetime import datetime
    if datetime.now().month in [11, 12]:  # 雙11、雙12
        base_discount += 0.15

    # 庫存折扣
    stock = cache.get(f'stock:{product.id}', 100)
    if stock < 10:
        base_discount += 0.2  # 清倉大折扣

    return min(base_discount, 0.5)  # 最多 50% 折扣


def calculate_average_rating(reviews):
    """CPU 密集型:計算平均評分"""
    if not reviews:
        return 0.0

    total = sum(r.rating for r in reviews)
    count = len(reviews)

    # 加權計算(新評論權重更高)
    weighted_total = 0
    weighted_count = 0
    for review in reviews:
        age_days = (datetime.now() - review.created_at).days
        weight = 1.0 / (1 + age_days / 30)  # 每 30 天權重減半
        weighted_total += review.rating * weight
        weighted_count += weight

    return weighted_total / weighted_count if weighted_count > 0 else 0.0

時間分析:

I/O 操作(400ms):
├── 查詢商品(含 select_related):200ms
├── 批量查詢庫存(Redis):50ms
└── 獲取推薦(API):150ms

CPU 操作(300ms):
├── 計算折扣(1000 次迴圈):100ms
├── 數據過濾和排序:80ms
└── JSON 序列化:120ms

比例:I/O 57% vs CPU 43%
結論:混合型!

場景 2:數據報表生成

# myapp/views.py
import pandas as pd
from django.http import JsonResponse
from django.db.models import Sum, Count, Avg
from datetime import datetime, timedelta

def sales_report_view(request):
    """
    銷售報表生成 - 混合型

    時間分配:
    1. 查詢訂單數據:300ms (I/O)
    2. 查詢用戶數據:150ms (I/O)
    3. Pandas 數據處理:400ms (CPU)
    4. 統計計算:200ms (CPU)
    5. 生成圖表數據:150ms (CPU)

    總時間:1200ms
    I/O 時間:450ms (37.5%)
    CPU 時間:750ms (62.5%)
    """
    start_date = datetime.now() - timedelta(days=30)

    # I/O:查詢訂單數據(複雜查詢)
    orders = Order.objects.select_related(
        'user', 'product'
    ).filter(
        created_at__gte=start_date
    ).values(
        'id', 'user__id', 'user__name',
        'product__name', 'amount', 'created_at'
    )

    # I/O:查詢聚合數據
    stats = Order.objects.filter(
        created_at__gte=start_date
    ).aggregate(
        total_amount=Sum('amount'),
        total_orders=Count('id'),
        avg_amount=Avg('amount')
    )

    # CPU:轉換為 Pandas DataFrame
    df = pd.DataFrame(list(orders))

    # CPU:數據處理
    # 1. 日期處理
    df['date'] = pd.to_datetime(df['created_at']).dt.date

    # 2. 按日統計(CPU 密集)
    daily_stats = df.groupby('date').agg({
        'amount': ['sum', 'mean', 'count'],
        'user__id': 'nunique'
    }).reset_index()

    # 3. 計算移動平均(CPU 密集)
    daily_stats['ma_7'] = daily_stats[('amount', 'sum')].rolling(
        window=7
    ).mean()

    # 4. 計算增長率(CPU 密集)
    daily_stats['growth_rate'] = daily_stats[('amount', 'sum')].pct_change()

    # 5. 用戶分析(CPU 密集)
    user_stats = df.groupby('user__id').agg({
        'amount': 'sum',
        'id': 'count'
    }).reset_index()
    user_stats.columns = ['user_id', 'total_amount', 'order_count']

    # CPU:生成圖表數據
    chart_data = generate_chart_data(daily_stats, user_stats)

    # CPU:JSON 序列化(大量數據)
    return JsonResponse({
        'summary': {
            'total_amount': float(stats['total_amount']),
            'total_orders': stats['total_orders'],
            'avg_amount': float(stats['avg_amount']),
        },
        'daily_stats': daily_stats.to_dict('records'),
        'user_stats': user_stats.head(100).to_dict('records'),
        'charts': chart_data,
    })

場景 3:內容推薦系統

# myapp/views.py
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
import numpy as np

@login_required
def recommendation_view(request):
    """
    內容推薦 - 混合型

    時間分配:
    1. 查詢用戶行為:150ms (I/O)
    2. 查詢內容資料:200ms (I/O)
    3. 計算相似度:300ms (CPU) ← 矩陣運算
    4. 排序和過濾:100ms (CPU)
    5. 查詢推薦理由:100ms (I/O)

    總時間:850ms
    I/O 時間:450ms (53%)
    CPU 時間:400ms (47%)
    """
    user_id = request.user.id

    # I/O:查詢用戶行為歷史
    user_history = UserBehavior.objects.filter(
        user_id=user_id
    ).values('content_id', 'action_type', 'score')

    # I/O:查詢候選內容
    candidate_contents = Content.objects.filter(
        is_active=True
    ).exclude(
        id__in=[h['content_id'] for h in user_history]
    )[:500]

    # CPU:構建用戶向量(數值計算)
    user_vector = build_user_vector(user_history)

    # CPU:批量計算相似度(矩陣運算,CPU 密集)
    scores = []
    for content in candidate_contents:
        # CPU:向量計算
        content_vector = build_content_vector(content)
        similarity = cosine_similarity(user_vector, content_vector)

        scores.append({
            'content_id': content.id,
            'title': content.title,
            'score': similarity,
        })

    # CPU:排序(大量數據)
    scores.sort(key=lambda x: x['score'], reverse=True)
    top_recommendations = scores[:20]

    # I/O:查詢推薦理由(為什麼推薦這些內容)
    reasons = fetch_recommendation_reasons(
        user_id,
        [r['content_id'] for r in top_recommendations]
    )

    return JsonResponse({
        'recommendations': top_recommendations,
        'reasons': reasons,
    })


def cosine_similarity(vec1, vec2):
    """CPU 密集型:餘弦相似度計算"""
    dot_product = np.dot(vec1, vec2)
    norm_a = np.linalg.norm(vec1)
    norm_b = np.linalg.norm(vec2)
    return dot_product / (norm_a * norm_b) if norm_a and norm_b else 0.0

⚙️ 混合型應用的配置策略

方案對比

混合型應用有三種主要配置方案:

方案 1:ASGI (Uvicorn) - 偏向 I/O

# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count()  # 4 核 = 4 workers
worker_class = "uvicorn.workers.UvicornWorker"

# 適合:I/O 占比 > 60%
# 例如:電商列表頁、API 網關

優點:

  • I/O 等待時可處理其他請求
  • 並發處理能力強
  • Workers 數量少,節省記憶體

缺點:

  • CPU 計算會阻塞事件循環
  • CPU 部分性能不如 Sync

適用場景:

  • I/O 占比 60-80%
  • CPU 計算相對簡單
  • 需要高並發

方案 2:Gthread - 平衡方案

# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count()  # 4 核 = 4 workers
worker_class = "gthread"
threads = 4  # 每個 worker 4 個線程

# 總並發:4 workers × 4 threads = 16

# 適合:I/O 和 CPU 各占 50%
# 例如:數據報表、內容推薦

優點:

  • 結合進程和線程的優勢
  • 可處理一定並發(16 個線程)
  • CPU 計算性能比 ASGI 好

缺點:

  • 受 Python GIL 限制(線程共享 GIL)
  • 並發能力不如 ASGI
  • 配置較複雜

適用場景:

  • I/O 和 CPU 各占 50%
  • 中等並發需求
  • 平衡型應用

方案 3:Sync + 增加 Workers - 偏向 CPU

# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = int(multiprocessing.cpu_count() * 1.5)  # 4 核 = 6 workers
worker_class = "sync"

# 適合:CPU 占比 > 60%
# 但有一定 I/O 等待

優點:

  • CPU 計算性能最好
  • 簡單穩定
  • 無 GIL 限制(多進程)

缺點:

  • I/O 等待時 worker 閒置
  • Workers 多,記憶體占用大
  • 並發能力有限

適用場景:

  • CPU 占比 60-80%
  • I/O 相對簡單
  • 不需要極高並發

配置決策樹

混合型應用配置選擇:

1. 先判斷 I/O vs CPU 比例

I/O > 70%
├─ 使用 ASGI (Uvicorn)
├─ workers = CPU 核心數
└─ 最高並發能力

I/O 60-70%
├─ 使用 Gthread
├─ workers = CPU 核心數
├─ threads = 4
└─ 平衡方案

I/O 50-60%
├─ 看並發需求
│  ├─ 高並發 → Gthread
│  └─ 中低並發 → Sync + 1.5x workers
└─ 根據實際測試選擇

I/O < 50%(CPU 為主)
├─ 使用 Sync
├─ workers = CPU × 1.5
└─ CPU 性能優先

📈 性能測試對比

測試場景:商品列表頁(混合型)

# 模擬混合型任務
def mixed_workload_view(request):
    """
    I/O 60% + CPU 40%
    總時間約 700ms
    """
    # I/O:資料庫查詢(300ms)
    products = Product.objects.select_related('category')[:500]

    # I/O:Redis 讀取(100ms)
    cache_data = cache.get_many([f'p:{p.id}' for p in products])

    # CPU:數據處理(200ms)
    result = []
    for product in products:
        discount = calculate_discount(product)  # CPU 計算
        result.append({
            'id': product.id,
            'name': product.name,
            'price': float(product.price * (1 - discount)),
        })

    # CPU:排序(100ms)
    result.sort(key=lambda x: x['price'])

    return JsonResponse({'products': result})

測試結果(4 核 CPU,1000 並發請求)

方案 1:ASGI (Uvicorn)

workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
ab -n 1000 -c 100 http://localhost:8000/mixed/

Results:
Time taken:       18.5 seconds
Requests per second: 54.05 [#/sec]
Mean time per request: 1850 [ms]
CPU usage: 75%
Memory: 680MB

分析:
✓ 並發處理好,I/O 等待時可處理其他請求
✗ CPU 計算部分會阻塞事件循環
→ 適合 I/O > 60% 的場景

方案 2:Gthread

workers = 4
worker_class = "gthread"
threads = 4
ab -n 1000 -c 100 http://localhost:8000/mixed/

Results:
Time taken:       16.2 seconds  ← 最快!
Requests per second: 61.73 [#/sec]
Mean time per request: 1620 [ms]
CPU usage: 82%
Memory: 720MB

分析:
✓ 平衡 I/O 和 CPU
16 個線程提供足夠並發
✓ 對混合型工作負載最優
→ 最適合 50-60% I/O 的場景 ⭐

方案 3:Sync (1.5x workers)

workers = 6  # 4 × 1.5
worker_class = "sync"
ab -n 1000 -c 100 http://localhost:8000/mixed/

Results:
Time taken:       20.8 seconds
Requests per second: 48.08 [#/sec]
Mean time per request: 2080 [ms]
CPU usage: 88%
Memory: 900MB

分析:
✓ CPU 計算性能最好
✗ I/O 等待時 worker 閒置
✗ 記憶體占用較高
→ 適合 CPU > 60% 的場景

性能對比總結

方案RPS響應時間CPU 使用記憶體最適場景
ASGI54.051850ms75%680MBI/O > 70%
Gthread61.731620ms82%720MBI/O 50-60%
Sync (1.5x)48.082080ms88%900MBCPU > 60%

結論:混合型應用(I/O 60% + CPU 40%),Gthread 表現最佳!


🎯 實戰案例:電商訂單處理

業務場景

訂單處理流程(混合型):
1. 查詢商品資訊(資料庫)
2. 檢查庫存(Redis)
3. 計算訂單總額(業務邏輯)
4. 計算運費(複雜規則)
5. 應用優惠券(業務邏輯)
6. 檢查用戶信用(外部 API)
7. 創建訂單(資料庫)
8. 扣減庫存(Redis)
9. 發送確認郵件(外部 SMTP)

I/O 操作:步驟 1, 2, 6, 7, 8, 9
CPU 操作:步驟 3, 4, 5

預估比例:I/O 55% + CPU 45%

應用代碼

# myapp/views.py
from django.db import transaction
from django.core.cache import cache
from django.http import JsonResponse
from decimal import Decimal
import requests

def create_order_view(request):
    """
    訂單處理 - 混合型應用

    時間分析:
    I/O 操作(550ms,55%):
    - 查詢商品:150ms
    - 檢查庫存(Redis):50ms
    - 檢查用戶信用(API):200ms
    - 創建訂單(資料庫):100ms
    - 發送郵件:50ms

    CPU 操作(450ms,45%):
    - 計算訂單總額:100ms
    - 計算運費:150ms
    - 應用優惠券:200ms
    """
    user = request.user
    cart_items = request.POST.getlist('items')

    # I/O:查詢商品資訊
    products = Product.objects.filter(
        id__in=cart_items
    ).select_related('category')

    # I/O:批量檢查庫存(Redis)
    stock_keys = [f'stock:{p.id}' for p in products]
    stocks = cache.get_many(stock_keys)

    # CPU:計算訂單總額(業務邏輯)
    subtotal = Decimal('0')
    order_items = []

    for product in products:
        quantity = int(request.POST.get(f'qty_{product.id}', 1))

        # 檢查庫存
        stock = stocks.get(f'stock:{product.id}', 0)
        if stock < quantity:
            return JsonResponse({
                'error': f'庫存不足:{product.name}'
            }, status=400)

        # CPU:計算小計
        item_total = product.price * quantity
        subtotal += item_total

        order_items.append({
            'product': product,
            'quantity': quantity,
            'price': product.price,
            'total': item_total,
        })

    # CPU:計算運費(複雜業務邏輯)
    shipping_fee = calculate_shipping_fee(
        user,
        products,
        subtotal
    )

    # CPU:應用優惠券(複雜折扣計算)
    discount = Decimal('0')
    if request.POST.get('coupon_code'):
        discount = calculate_coupon_discount(
            request.POST['coupon_code'],
            subtotal,
            products
        )

    # CPU:計算最終金額
    total = subtotal + shipping_fee - discount

    # I/O:檢查用戶信用(調用外部 API)
    credit_check = check_user_credit(user.id, float(total))
    if not credit_check['approved']:
        return JsonResponse({
            'error': '信用檢查未通過'
        }, status=400)

    # I/O:創建訂單(資料庫事務)
    with transaction.atomic():
        order = Order.objects.create(
            user=user,
            subtotal=subtotal,
            shipping_fee=shipping_fee,
            discount=discount,
            total=total,
            status='pending'
        )

        # 創建訂單明細
        for item in order_items:
            OrderItem.objects.create(
                order=order,
                product=item['product'],
                quantity=item['quantity'],
                price=item['price'],
                total=item['total']
            )

        # I/O:扣減庫存(Redis)
        for item in order_items:
            cache.decr(f'stock:{item["product"].id}', item['quantity'])

    # I/O:發送確認郵件(異步)
    send_order_confirmation_email.delay(order.id)

    return JsonResponse({
        'order_id': order.id,
        'total': float(total),
        'status': 'success'
    })


def calculate_shipping_fee(user, products, subtotal):
    """CPU 密集型:複雜運費計算"""
    # 基礎運費
    base_fee = Decimal('50')

    # 免運門檻
    if subtotal >= Decimal('1000'):
        return Decimal('0')

    # 重量計算
    total_weight = sum(p.weight for p in products)
    if total_weight > 10:  # 超過 10kg
        base_fee += Decimal('30')

    # 地區運費
    region_multiplier = {
        'north': Decimal('1.0'),
        'central': Decimal('1.2'),
        'south': Decimal('1.3'),
        'east': Decimal('1.5'),
    }
    multiplier = region_multiplier.get(user.region, Decimal('1.0'))

    return base_fee * multiplier


def calculate_coupon_discount(coupon_code, subtotal, products):
    """CPU 密集型:優惠券折扣計算"""
    try:
        coupon = Coupon.objects.get(code=coupon_code, is_active=True)
    except Coupon.DoesNotExist:
        return Decimal('0')

    # 檢查最低消費
    if subtotal < coupon.min_amount:
        return Decimal('0')

    # 計算折扣
    if coupon.discount_type == 'percentage':
        discount = subtotal * (coupon.discount_value / 100)
        return min(discount, coupon.max_discount)

    elif coupon.discount_type == 'fixed':
        return coupon.discount_value

    elif coupon.discount_type == 'product_specific':
        # 複雜計算:特定商品折扣
        applicable_total = sum(
            p.price
            for p in products
            if p.category_id in coupon.applicable_categories
        )
        return applicable_total * (coupon.discount_value / 100)

    return Decimal('0')

Gunicorn 配置

# gunicorn.conf.py - 混合型應用配置
import multiprocessing

# 伺服器規格:8 核 CPU,16GB RAM

bind = "0.0.0.0:8000"

# 混合型:使用 Gthread(平衡方案)
workers = 8  # = CPU 核心數
worker_class = "gthread"
threads = 4  # 每個 worker 4 個線程

# 總並發:8 workers × 4 threads = 32

# 超時配置
timeout = 120  # 混合型可能需要較長時間
graceful_timeout = 30
keepalive = 10  # 中等 keepalive

# 防止記憶體洩漏
max_requests = 1000
max_requests_jitter = 100

# 性能優化
preload_app = True
worker_tmp_dir = "/dev/shm"

# 日誌
loglevel = "info"
accesslog = "/var/log/gunicorn/order-access.log"
errorlog = "/var/log/gunicorn/order-error.log"

# Hooks
def post_fork(server, worker):
    from django import db
    from django.core.cache import cache

    db.connections.close_all()

    # 測試 Redis 連接
    try:
        cache.get('health_check')
    except Exception as e:
        server.log.error(f"Redis connection failed: {e}")

    server.log.info(
        f"Worker {worker.pid} ready (gthread mode: {threads} threads)"
    )

性能測試

# 壓測命令
ab -n 10000 -c 200 -p order.json \
   -T "application/json" \
   http://localhost:8000/api/orders/create/

# 結果:
Concurrency Level:      200
Time taken for tests:   158.3 seconds
Complete requests:      10000
Failed requests:        0
Requests per second:    63.17 [#/sec]
Time per request:       3166 [ms]
Time per request (mean, across all concurrent requests): 15.83 [ms]

# 資源使用:
CPU: 85% (充分利用)
Memory: 5.2GB / 16GB
Workers: 8 (all active)
Threads per worker: 4 (average 3.2 active)

# 結論:
✓ 配置良好,可處理高並發
✓ CPU 和記憶體使用合理
✓ 響應時間穩定

🚀 進階方案:ASGI + Celery

當你的混合型應用流量很大,且 CPU 密集型任務可以異步處理時,ASGI + Celery 是最佳方案。

架構概述

用戶請求
  ↓
┌─────────────────────────────────────────────┐
│  Nginx (負載均衡、靜態文件)                    │
└─────────────────────────────────────────────┘
  ↓
┌─────────────────────────────────────────────┐
│  Gunicorn + ASGI (Uvicorn Worker)            │
│                                              │
│  職責:處理 HTTP 請求                         │
│  • I/O 密集型:直接處理並返回 ✓               │
│  • CPU 密集型:提交到 Celery ✓                │
│                                              │
│  workers = 8                                │
│  worker_class = "uvicorn.workers.UvicornWorker" │
│  ├─ 高並發處理能力(數千並發)                  │
│  ├─ 不被 CPU 任務阻塞                         │
│  └─ 立即響應用戶                              │
└─────────────────────────────────────────────┘
  ↓
┌─────────────────────────────────────────────┐
│  Redis Queue (消息隊列)                       │
│  • 任務隊列                                   │
│  • 任務狀態存儲                                │
└─────────────────────────────────────────────┘
  ↓
┌─────────────────────────────────────────────┐
│  Celery Workers (prefork)                   │
│                                              │
│  職責:背景處理 CPU 密集型任務                  │
│  • 視頻轉碼                                   │
│  • 圖片處理                                   │
│  • 數據分析報表                                │
│  • 複雜計算                                   │
│                                              │
│  pool = "prefork"                           │
│  concurrency = 4  # = CPU 核心數              │
│  ├─ 多進程真正並行                             │
│  ├─ 不影響 HTTP 響應                          │
│  └─ 充分利用 CPU                              │
└─────────────────────────────────────────────┘

何時需要引入 Celery?

# 決策標準:

 需要 Celery 的場景

1. CPU 密集型任務 + 用戶可以等待
    視頻轉碼可以背景處理
    大量數據導出可以發郵件通知
    複雜報表生成可以異步生成

2. 任務執行時間 > 5    超過 HTTP timeout 容易超時
    用戶體驗差一直轉圈

3. 需要重試機制
    調用外部 API 可能失敗
    需要自動重試

4. 定時任務
    每日報表生成
    定期數據同步

 不需要 Celery 的場景

1. 用戶需要立即看到結果
    結帳頁面計算總額
    搜索結果
    實時推薦

2. 任務很快< 500ms
    簡單的業務邏輯計算
    輕量級數據處理
    提交到 Celery 反而更慢

3. 小型專案
    流量 < 100 QPS
    架構過於複雜
    維護成本高

配置範例

Gunicorn 配置(ASGI)

# gunicorn.conf.py
import multiprocessing

# HTTP 請求處理層
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2  # 8 核 = 16 workers
worker_class = "uvicorn.workers.UvicornWorker"

# ASGI 優勢:
# - 高並發處理 I/O 密集型請求
# - 不被 CPU 任務阻塞(已提交到 Celery)

timeout = 60  # 較短超時(不處理 CPU 密集型)
graceful_timeout = 30
keepalive = 20

max_requests = 3000
max_requests_jitter = 300

preload_app = True
worker_tmp_dir = "/dev/shm"

loglevel = "info"

Celery 配置(CPU 密集型任務)

# celeryconfig.py
from kombu import Queue

# Broker 配置
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'

# Worker 配置(CPU 密集型)
worker_pool = 'prefork'  # 多進程,類似 Gunicorn sync
worker_concurrency = 4   # = CPU 核心數
worker_prefetch_multiplier = 1  # 一次只取一個任務

# 任務時間限制
task_time_limit = 3600  # 1 小時硬超時
task_soft_time_limit = 3000  # 50 分鐘軟超時

# 防止記憶體洩漏
worker_max_tasks_per_child = 100

# 任務路由(可選)
task_routes = {
    'myapp.tasks.process_video': {'queue': 'video'},
    'myapp.tasks.generate_report': {'queue': 'report'},
    'myapp.tasks.process_image': {'queue': 'image'},
}

# 定義隊列
task_queues = (
    Queue('default'),
    Queue('video'),
    Queue('report'),
    Queue('image'),
)

啟動命令

# 1. 啟動 Gunicorn(處理 HTTP)
gunicorn -c gunicorn.conf.py myapp.asgi:application

# 2. 啟動 Celery Worker(處理 CPU 密集型)
celery -A myapp worker \
    --pool=prefork \
    --concurrency=4 \
    --loglevel=info \
    --max-tasks-per-child=100

# 3. 啟動 Celery Beat(定時任務,可選)
celery -A myapp beat --loglevel=info

# 4. 啟動 Flower(監控,可選)
celery -A myapp flower --port=5555

應用代碼範例

# myapp/tasks.py
from celery import shared_task
from .models import VideoProcessJob
import subprocess

@shared_task(bind=True, max_retries=3)
def process_video_task(self, video_path, job_id):
    """
    CPU 密集型:視頻轉碼
    在 Celery Worker 中執行
    """
    job = VideoProcessJob.objects.get(id=job_id)
    job.status = 'processing'
    job.save()

    try:
        # CPU 密集型處理(FFmpeg)
        output_path = f'/tmp/output_{job_id}.mp4'
        subprocess.run([
            'ffmpeg',
            '-i', video_path,
            '-c:v', 'libx264',
            '-preset', 'medium',
            '-crf', '23',
            output_path
        ], check=True)

        # 上傳到存儲
        url = upload_to_s3(output_path)

        job.status = 'completed'
        job.result_url = url
        job.save()

        return url

    except Exception as exc:
        job.status = 'failed'
        job.error = str(exc)
        job.save()

        # 重試
        raise self.retry(exc=exc, countdown=60)


# myapp/views.py
from django.http import JsonResponse
from .tasks import process_video_task
from .models import VideoProcessJob

async def upload_video_view(request):
    """
    I/O 密集型:接收上傳
    在 Gunicorn ASGI 中處理
    """
    video = request.FILES['video']

    # I/O:保存文件
    video_path = await save_video_async(video)

    # 創建任務記錄
    job = await VideoProcessJob.objects.acreate(
        status='pending',
        video_path=video_path
    )

    # 提交到 Celery(立即返回)
    process_video_task.delay(video_path, job.id)

    # 立即響應用戶
    return JsonResponse({
        'job_id': job.id,
        'status': 'pending',
        'message': '視頻上傳成功,正在處理中'
    })


async def check_job_status_view(request, job_id):
    """
    查詢任務狀態
    在 Gunicorn ASGI 中處理
    """
    job = await VideoProcessJob.objects.aget(id=job_id)

    return JsonResponse({
        'job_id': job.id,
        'status': job.status,
        'progress': job.progress,
        'result_url': job.result_url if job.status == 'completed' else None,
        'error': job.error if job.status == 'failed' else None,
    })

優勢分析

特性ASGI + Celery僅 Gthread僅 ASGI
I/O 並發能力⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
CPU 任務不阻塞 HTTP✅ 完全不阻塞❌ 會阻塞❌ 會阻塞
CPU 計算性能⭐⭐⭐⭐⭐ (獨立處理)⭐⭐⭐⭐⭐
用戶體驗✅ 立即響應⚠️ 需等待⚠️ 需等待
資源利用率⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
可擴展性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
架構複雜度⚠️ 高✅ 低✅ 低
開發成本⚠️ 高✅ 低✅ 低
維護成本⚠️ 高✅ 低✅ 低

性能對比

# 測試場景:
# - 1000 個請求
# - 每個請求包含:
#   - I/O:查詢資料庫(200ms)
#   - CPU:圖片處理(500ms)

# 配置 1:Gthread(同步處理)
workers = 8
worker_class = "gthread"
threads = 4

結果:
- 總時間:52 秒
- RPS:19.2
- 用戶等待時間:700ms(I/O + CPU)
- HTTP worker 利用率:85%

# 配置 2:ASGI(CPU 阻塞事件循環)
workers = 8
worker_class = "uvicorn.workers.UvicornWorker"

結果:
- 總時間:58 秒  ← 更慢!
- RPS:17.2
- 用戶等待時間:700ms
- 事件循環被 CPU 阻塞

# 配置 3:ASGI + Celery(最佳)⭐
Gunicorn:
  workers = 8
  worker_class = "uvicorn.workers.UvicornWorker"

Celery:
  concurrency = 4
  pool = "prefork"

結果:
- 總時間:15 秒  ← 快 3.5 倍!
- RPS:66.7
- 用戶等待時間:200ms(只等 I/O)
- HTTP 響應快,CPU 背景處理

限制和注意事項

⚠️ 架構複雜度

# 需要維護的組件:
1. Gunicorn (ASGI)
2. Celery Workers
3. Redis/RabbitMQ (消息隊列)
4. PostgreSQL (資料庫)
5. Celery Beat (定時任務可選)
6. Flower (監控可選)

# 部署複雜度增加:
- 多個服務需要協同啟動
- 需要監控多個服務健康狀態
- 故障排查更複雜問題可能在任何一層

⚠️ 不適合需要同步結果的場景

# ❌ 錯誤示例:
def checkout_view(request):
    """
    用戶點擊結帳,需要立即顯示總金額
    這種場景不能用 Celery!
    """
    # ❌ 錯誤:用戶要等多久?
    job = calculate_total.delay(cart_items)
    # 用戶一直看到轉圈?不知道要等多久?

    # ✅ 正確:必須同步計算
    total = calculate_total_sync(cart_items)
    return JsonResponse({'total': total})

⚠️ 輕量級任務反而更慢

# 任務執行時間:50ms

# 僅 ASGI(直接處理):
# 總時間:50ms

# ASGI + Celery:
# - 序列化任務:5ms
# - 發送到 Queue:5ms
# - Celery 取出:5ms
# - 執行任務:50ms
# - 返回結果:5ms
# - 查詢結果:10ms
# 總時間:80ms

# 結論:輕量級任務(< 500ms)不適合 Celery

適用場景總結

# ✅ 非常適合 ASGI + Celery:

1. 高流量 Web 應用
   - I/O 密集型請求> 1000 QPS
   - CPU 密集型任務可異步處理
   - 用戶體驗優先

2. 內容處理平台
   - 視頻上傳 + 背景轉碼
   - 圖片上傳 + 背景處理
   - 文件導出可發郵件通知

3. SaaS 平台
   - 大量用戶在線I/O
   - 定期生成報表CPU
   - 數據分析CPU

4. 電商平台
   - 商品瀏覽I/O
   - 訂單處理混合可異步部分
   - 數據統計CPU

# ❌ 不適合 ASGI + Celery:

1. 小型專案
   - 流量 < 100 QPS
   - 架構過於複雜

2. 需要實時結果
   - 金融交易
   - 遊戲服務器
   - 即時競價

3. 簡單 API 服務
   - 只是 CRUD 操作
   - 沒有複雜計算

4. 任務很快的場景
   - < 500ms 的計算
   - 提交到 Celery 反而更慢

最終建議

選擇配置的決策流程:

你的應用是否滿足以下條件?
1. 流量 > 1000 QPS
2. 有 CPU 密集型任務
3. CPU 任務可以異步處理
4. 用戶不需要立即看到 CPU 計算結果

全部滿足?
└─ ✅ 使用 ASGI + Celery(最佳方案)

不全部滿足?
├─ 高流量 + 純 I/O → ASGI
├─ 混合型 + 中等流量 → Gthread
└─ 其他 → 根據實際需求選擇

記住:沒有萬能配置,只有最適合的配置。ASGI + Celery 是特定場景下的最佳方案,而非所有場景的唯一答案。


💡 面試常見問題

Q1:什麼是混合型應用?如何判斷?

完整答案:

混合型應用是指既有大量 I/O 操作又有相當比例 CPU 計算的應用。

判斷方法:

1. 分析時間占比

# 使用 Django Debug Toolbar 或 cProfile
# 統計 I/O 時間和 CPU 時間

I/O 時間50-70%
CPU 時間30-50%
 混合型

2. 檢視代碼特徵

def view(request):
    # I/O:資料庫查詢
    data = Model.objects.filter(...)

    # CPU:數據處理
    result = [process(item) for item in data]

    # I/O:外部 API
    api_result = call_api()

    # CPU:複雜計算
    final = calculate(result, api_result)

# 既有 I/O 又有 CPU → 混合型

3. 監控伺服器指標

# 運行時觀察
top

# 混合型特徵:
# - CPU 使用率:60-80%(中高)
# - I/O wait:15-30%(中等)
# - 兩者都有明顯占比

典型場景:

  • 電商訂單處理(查詢 + 業務邏輯)
  • 數據報表生成(查詢 + 統計計算)
  • 內容推薦系統(查詢 + 相似度計算)

Q2:混合型應用應該用什麼 Worker?

完整答案:

混合型應用的配置需要根據 I/O vs CPU 的比例決定:

配置策略:

I/O 比例推薦配置Worker 類型原因
> 70%UvicornASGII/O 為主,需要高並發
60-70%Gthread多線程平衡方案 ⭐
50-60%Gthread 或 Sync視並發需求需要測試
< 50%Sync多進程CPU 為主

推薦配置(I/O 60% + CPU 40%):

# Gthread - 平衡方案
workers = CPU 核心數
worker_class = "gthread"
threads = 4

# 總並發:workers × threads
# 例如:4 核 = 4 workers × 4 threads = 16 並發

為什麼 Gthread 適合混合型?

  • 多進程:每個 worker 獨立,CPU 計算互不影響
  • 多線程:I/O 等待時線程可切換,提高並發
  • 平衡:兼顧 I/O 並發和 CPU 性能

注意事項:

  • Gthread 受 Python GIL 限制
  • CPU 密集型部分性能不如 Sync
  • I/O 並發能力不如 ASGI
  • 但混合型來說是最佳平衡點

Q3:Gthread 和 ASGI 如何選擇?

完整答案:

這兩種配置都可以處理並發,但原理和適用場景不同:

Gthread (多線程):

workers = 4
worker_class = "gthread"
threads = 4
# 總並發:16

# 原理:
# - 每個 worker 是獨立進程
# - 每個進程內有多個線程
# - 線程共享進程記憶體
# - 受 Python GIL 限制

優點:

  • CPU 計算:每個進程獨立 GIL,真正並行
  • I/O 操作:線程切換,提供並發
  • 平衡性好

缺點:

  • 線程間受 GIL 限制
  • 並發能力不如 ASGI

ASGI (異步協程):

workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
# 每個 worker 可處理數千並發

# 原理:
# - 基於 asyncio 事件循環
# - 協程切換非常輕量
# - I/O 等待時自動切換

優點:

  • 並發能力極強(數千並發)
  • I/O 密集型性能最好
  • 記憶體占用少

缺點:

  • CPU 計算會阻塞事件循環
  • 不適合 CPU 密集型

選擇標準:

I/O > 70%
└─ ASGI(高並發優先)

I/O 60-70%
├─ 需要極高並發 → ASGI
└─ CPU 計算較多 → Gthread

I/O 50-60%
└─ Gthread(平衡優先)

I/O < 50%
└─ Sync(CPU 優先)

實測對比(I/O 60% + CPU 40%):

  • ASGI: 54 RPS
  • Gthread: 62 RPS ⭐(最佳)
  • Sync: 48 RPS

Q4:如何優化混合型應用的性能?

完整答案:

優化策略分為四個層次:

1. 配置優化(快速見效)

# 選擇合適的 Worker 類型
# I/O 60% + CPU 40% → Gthread

workers = CPU 核心數
worker_class = "gthread"
threads = 4

2. I/O 優化(重點!)

# a. 資料庫查詢優化
# 使用 select_related、prefetch_related
products = Product.objects.select_related(
    'category'
).prefetch_related('images')

# b. 使用緩存
from django.core.cache import cache
result = cache.get('key')
if not result:
    result = expensive_query()
    cache.set('key', result, timeout=300)

# c. 批量操作
# 避免循環查詢(N+1 問題)
ids = [item.id for item in items]
data = Model.objects.in_bulk(ids)

3. CPU 優化

# a. 算法優化
# 使用更高效的算法
# 例如:字典查找 O(1) vs 列表查找 O(n)

# b. 使用 C 擴展
# Pandas、NumPy 等底層用 C 實現
import pandas as pd
df = pd.DataFrame(data)  # 比純 Python 快很多

# c. 移除不必要的計算
# 只計算需要的部分

4. 架構優化(根本解決)

# a. 異步化 I/O 操作
async def view(request):
    # 並發執行多個 I/O
    task1 = fetch_data()
    task2 = call_api()
    result1, result2 = await asyncio.gather(task1, task2)

# b. CPU 密集型移到 Celery
@shared_task
def heavy_calculation(data):
    # 背景處理 CPU 密集型任務
    return process(data)

# c. 使用緩存層
# Redis 緩存熱數據,減少資料庫壓力

優先級:

  1. I/O 優化(select_related、緩存): 2-5x ⭐⭐⭐
  2. 配置優化(選對 Worker): 20-30% ⭐⭐
  3. CPU 優化(算法、C 擴展): 10-50% ⭐⭐
  4. 架構優化(異步、Celery): 10x+ ⭐⭐⭐

✅ 重點回顧

混合型應用特徵

I/O 等待:50-70%
CPU 計算:30-50%
兩者都不可忽視

配置策略

# I/O > 70%
workers = CPU_cores
worker_class = "uvicorn.workers.UvicornWorker"

# I/O 60-70%(推薦)
workers = CPU_cores
worker_class = "gthread"
threads = 4

# I/O 50-60%
# 視並發需求選擇 Gthread 或 Sync

# I/O < 50%
workers = CPU_cores * 1.5
worker_class = "sync"

Gthread 最適合混合型

優點:
✓ 平衡 I/O 和 CPU
✓ 多進程 + 多線程
✓ 中等並發能力
✓ 實測性能最佳

實測結果(I/O 60% + CPU 40%):
- Gthread: 62 RPS ⭐
- ASGI: 54 RPS
- Sync: 48 RPS

優化重點

1. I/O 優化:select_related、緩存
2. 配置優化:選擇 Gthread
3. CPU 優化:算法、C 擴展
4. 架構優化:異步、Celery

📚 接下來

下一篇:03-4. 生產環境部署

  • 完整的生產環境部署流程
  • Nginx + Gunicorn 配置
  • 監控和日誌
  • 故障排查

相關章節:


最後更新:2025-10-31

0%