Django 面試準備 08-4:防止超賣方案

構建多層防護體系,徹底杜絕超賣問題

08-4. 防止超賣方案

📌 什麼是超賣?

超賣(Overselling):指實際銷售的數量超過了庫存數量。

典型場景

商品庫存:10 件
訂單 A:購買 5 件 ✓
訂單 B:購買 6 件 ✓
實際賣出:11 件 ⚠️

結果:超賣 1 件!

🔴 超賣的危害

1. 業務損失

場景:秒殺活動,iPhone 只有 10 台
結果:賣出 15 台
損失:5 台 × NT$ 40,000 = NT$ 200,000

2. 用戶投訴

用戶下單成功 → 支付完成 → 被告知缺貨
結果:用戶體驗極差,引發投訴和退款

3. 法律風險

根據《消費者保護法》,已收款但無法交貨可能構成欺詐。


🎯 防超賣的核心原則

1. 單一數據源

❌ 錯誤:多個地方維護庫存
- 數據庫有庫存
- Redis 有庫存
- 內存中也有庫存
→ 容易不一致

✅ 正確:Redis 為主,數據庫為輔
- Redis:實時庫存(高性能)
- 數據庫:持久化庫存(可靠性)

2. 原子操作

所有庫存扣減操作必須是原子的:

  • 使用 F() 表達式
  • 使用 Redis Lua 腳本
  • 使用分布式鎖

3. 多層防護

第一層:前端限制(防止誤操作)
第二層:應用層限流(防止惡意刷單)
第三層:Redis 原子扣減(防止超賣)
第四層:數據庫最終校驗(兜底保護)

💻 完整防超賣方案

架構設計

                  用戶請求
                     ↓
              ┌──────────────┐
              │ 1. 前端校驗  │
              │ - 按鈕置灰   │
              │ - 防重複點擊 │
              └──────┬───────┘
                     ↓
              ┌──────────────┐
              │ 2. 應用層限流│
              │ - IP 限流    │
              │ - 用戶限流   │
              └──────┬───────┘
                     ↓
              ┌──────────────┐
              │ 3. Redis 扣減│
              │ - Lua 原子操作│
              │ - 庫存預扣   │
              └──────┬───────┘
                     ↓
              ┌──────────────┐
              │ 4. 異步訂單  │
              │ - 消息隊列   │
              │ - DB 扣減    │
              └──────┬───────┘
                     ↓
              ┌──────────────┐
              │ 5. 數據庫校驗│
              │ - 最終一致性 │
              └──────────────┘

🛡️ 第一層:前端防護

<!DOCTYPE html>
<html>
<head>
    <title>商品購買</title>
</head>
<body>
    <h1>iPhone 15 Pro Max</h1>
    <p>庫存:<span id="stock">10</span></p>

    <button id="buy-btn" onclick="buyProduct()">立即購買</button>

    <script>
        let clicking = false;

        function buyProduct() {
            // 1. 防止重複點擊
            if (clicking) {
                alert('請勿重複點擊');
                return;
            }

            // 2. 檢查庫存
            let stock = parseInt(document.getElementById('stock').innerText);
            if (stock <= 0) {
                alert('商品已售罄');
                return;
            }

            // 3. 鎖定按鈕
            clicking = true;
            document.getElementById('buy-btn').disabled = true;
            document.getElementById('buy-btn').innerText = '處理中...';

            // 4. 發送請求
            fetch('/api/purchase/', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'X-CSRFToken': getCookie('csrftoken')
                },
                body: JSON.stringify({product_id: 123})
            })
            .then(response => response.json())
            .then(data => {
                if (data.status === 'success') {
                    alert('購買成功');
                    // 更新庫存顯示
                    document.getElementById('stock').innerText = data.remaining_stock;
                } else {
                    alert(data.message);
                }
            })
            .finally(() => {
                // 5. 恢復按鈕(延遲2秒防止重複點擊)
                setTimeout(() => {
                    clicking = false;
                    document.getElementById('buy-btn').disabled = false;
                    document.getElementById('buy-btn').innerText = '立即購買';
                }, 2000);
            });
        }

        function getCookie(name) {
            let value = `; ${document.cookie}`;
            let parts = value.split(`; ${name}=`);
            if (parts.length === 2) return parts.pop().split(';').shift();
        }
    </script>
</body>
</html>

🚦 第二層:應用層限流

from django.core.cache import cache
from django.http import JsonResponse
from functools import wraps

def rate_limit_purchase(max_requests=3, time_window=60):
    """
    購買限流裝飾器
    每個用戶在 time_window 秒內最多購買 max_requests 次
    """
    def decorator(func):
        @wraps(func)
        def wrapper(request, *args, **kwargs):
            user_id = request.user.id if request.user.is_authenticated else None
            ip = request.META.get('REMOTE_ADDR')

            # 生成限流鍵
            cache_key = f'purchase_limit:{user_id or ip}'

            # 獲取當前請求次數
            requests = cache.get(cache_key, 0)

            if requests >= max_requests:
                return JsonResponse({
                    'status': 'error',
                    'message': f'您的操作過於頻繁,請{time_window}秒後再試'
                }, status=429)

            # 增加計數
            cache.set(cache_key, requests + 1, time_window)

            return func(request, *args, **kwargs)

        return wrapper
    return decorator

🔥 第三層:Redis 原子扣減(核心)

import redis
import json

class InventoryManager:
    """庫存管理器"""

    def __init__(self):
        self.redis_client = redis.StrictRedis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )

    def init_stock(self, product_id, stock):
        """初始化庫存"""
        key = f'inventory:{product_id}'
        self.redis_client.set(key, stock)

    def get_stock(self, product_id):
        """獲取庫存"""
        key = f'inventory:{product_id}'
        stock = self.redis_client.get(key)
        return int(stock) if stock else 0

    def deduct_stock(self, product_id, quantity=1):
        """
        扣減庫存(Lua 腳本保證原子性)
        返回:
          - 成功:剩餘庫存
          - 失敗:-1
        """
        key = f'inventory:{product_id}'

        # Lua 腳本
        lua_script = """
        local stock = redis.call('GET', KEYS[1])
        if not stock then
            return -2  -- 商品不存在
        end

        stock = tonumber(stock)
        local quantity = tonumber(ARGV[1])

        if stock >= quantity then
            local remaining = redis.call('DECRBY', KEYS[1], quantity)
            return remaining
        else
            return -1  -- 庫存不足
        end
        """

        result = self.redis_client.eval(lua_script, 1, key, quantity)
        return result

    def restore_stock(self, product_id, quantity=1):
        """恢復庫存(用於訂單取消)"""
        key = f'inventory:{product_id}'
        self.redis_client.incrby(key, quantity)

    def check_user_limit(self, product_id, user_id):
        """檢查用戶購買限制(每人限購1件)"""
        key = f'inventory:user_limit:{product_id}'
        return self.redis_client.sismember(key, user_id)

    def mark_user_purchased(self, product_id, user_id):
        """標記用戶已購買"""
        key = f'inventory:user_limit:{product_id}'
        self.redis_client.sadd(key, user_id)
        # 設置過期時間(活動結束後自動清理)
        self.redis_client.expire(key, 86400)  # 24小時


inventory_manager = InventoryManager()

🎯 第四層:完整購買流程

from django.views.decorators.http import require_POST
from django.contrib.auth.decorators import login_required
from django.db import transaction
from django.db.models import F
import logging
import uuid

logger = logging.getLogger(__name__)

@require_POST
@login_required
@rate_limit_purchase(max_requests=3, time_window=60)
def purchase_product(request):
    """
    商品購買接口(完整防超賣方案)
    """
    try:
        data = json.loads(request.body)
        product_id = data.get('product_id')
        quantity = data.get('quantity', 1)
        user = request.user

        # ===== 第一步:基礎校驗 =====
        if quantity <= 0 or quantity > 5:
            return JsonResponse({
                'status': 'error',
                'message': '購買數量不合法'
            })

        # 查詢商品信息
        try:
            product = Product.objects.get(id=product_id)
        except Product.DoesNotExist:
            return JsonResponse({
                'status': 'error',
                'message': '商品不存在'
            })

        # ===== 第二步:檢查用戶限購 =====
        if inventory_manager.check_user_limit(product_id, user.id):
            return JsonResponse({
                'status': 'error',
                'message': '您已經購買過該商品了'
            })

        # ===== 第三步:Redis 原子扣減庫存 =====
        remaining_stock = inventory_manager.deduct_stock(product_id, quantity)

        if remaining_stock == -2:
            return JsonResponse({
                'status': 'error',
                'message': '商品信息異常'
            })

        if remaining_stock == -1:
            return JsonResponse({
                'status': 'error',
                'message': '庫存不足'
            })

        # ===== 第四步:標記用戶已購買 =====
        inventory_manager.mark_user_purchased(product_id, user.id)

        # ===== 第五步:異步創建訂單 =====
        order_no = f'ORD{int(time.time())}{uuid.uuid4().hex[:8].upper()}'

        # 使用 Celery 異步處理
        from .tasks import create_order_task
        create_order_task.delay(
            order_no=order_no,
            product_id=product_id,
            user_id=user.id,
            quantity=quantity
        )

        # ===== 第六步:返回成功響應 =====
        return JsonResponse({
            'status': 'success',
            'message': '購買成功',
            'order_no': order_no,
            'remaining_stock': remaining_stock
        })

    except Exception as e:
        logger.error(f'Purchase failed: {e}', exc_info=True)
        return JsonResponse({
            'status': 'error',
            'message': '系統錯誤,請稍後再試'
        }, status=500)

🔧 第五層:異步訂單處理

from celery import shared_task
from django.db import transaction
from django.db.models import F

@shared_task(bind=True, max_retries=3)
def create_order_task(self, order_no, product_id, user_id, quantity):
    """
    異步創建訂單
    """
    try:
        with transaction.atomic():
            # 1. 再次檢查並扣減數據庫庫存(使用 F() 表達式)
            affected_rows = Product.objects.filter(
                id=product_id,
                stock__gte=quantity
            ).update(stock=F('stock') - quantity)

            if affected_rows == 0:
                # 數據庫庫存不足(異常情況)
                logger.error(f'DB stock insufficient for product {product_id}')

                # 恢復 Redis 庫存
                inventory_manager.restore_stock(product_id, quantity)

                # 創建失敗訂單
                Order.objects.create(
                    order_no=order_no,
                    product_id=product_id,
                    user_id=user_id,
                    quantity=quantity,
                    status='failed',
                    fail_reason='庫存不足'
                )

                return

            # 2. 創建訂單
            order = Order.objects.create(
                order_no=order_no,
                product_id=product_id,
                user_id=user_id,
                quantity=quantity,
                status='pending'
            )

            # 3. 其他業務邏輯
            # - 扣減用戶餘額
            # - 發送通知
            # - 記錄日誌

            logger.info(f'Order created successfully: {order_no}')

            return order.id

    except Exception as e:
        logger.error(f'Create order failed: {e}', exc_info=True)

        # 重試
        raise self.retry(exc=e, countdown=5)

🔍 監控和告警

1. 庫存監控

from django.core.management.base import BaseCommand

class Command(BaseCommand):
    help = '監控 Redis 和數據庫庫存差異'

    def handle(self, *args, **options):
        products = Product.objects.all()

        for product in products:
            redis_stock = inventory_manager.get_stock(product.id)
            db_stock = product.stock

            diff = abs(redis_stock - db_stock)

            if diff > 10:
                # 差異過大,發送告警
                logger.warning(
                    f'Stock mismatch detected: '
                    f'Product {product.id}, '
                    f'Redis: {redis_stock}, '
                    f'DB: {db_stock}, '
                    f'Diff: {diff}'
                )

                # 發送郵件/短信通知
                send_alert_email(
                    subject='庫存異常告警',
                    message=f'商品 {product.name} 庫存差異過大'
                )

2. 超賣檢測

@shared_task
def check_overselling():
    """
    定期檢測是否有超賣情況
    """
    products = Product.objects.annotate(
        sold_quantity=Count('order')
    ).filter(
        sold_quantity__gt=F('initial_stock')
    )

    if products.exists():
        for product in products:
            logger.critical(
                f'Overselling detected! '
                f'Product: {product.name}, '
                f'Initial stock: {product.initial_stock}, '
                f'Sold: {product.sold_quantity}'
            )

            # 緊急告警
            send_urgent_alert(product)

🎯 極端場景處理

場景 1:Redis 宕機

def purchase_with_fallback(request, product_id):
    """
    帶降級策略的購買接口
    """
    try:
        # 嘗試使用 Redis
        remaining_stock = inventory_manager.deduct_stock(product_id, 1)

        if remaining_stock >= 0:
            create_order_task.delay(...)
            return JsonResponse({'status': 'success'})

    except redis.ConnectionError:
        # Redis 連接失敗,降級到數據庫鎖
        logger.warning('Redis connection failed, fallback to DB lock')

        with transaction.atomic():
            product = Product.objects.select_for_update().get(id=product_id)

            if product.stock >= 1:
                product.stock -= 1
                product.save()

                # 創建訂單
                order = Order.objects.create(...)
                return JsonResponse({'status': 'success'})
            else:
                return JsonResponse({'status': 'error', 'message': '庫存不足'})

    return JsonResponse({'status': 'error', 'message': '庫存不足'})

場景 2:訂單超時未支付

@shared_task
def cancel_unpaid_orders():
    """
    取消超時未支付的訂單,恢復庫存
    """
    # 查詢 30 分鐘前創建但未支付的訂單
    timeout = timezone.now() - timedelta(minutes=30)

    orders = Order.objects.filter(
        status='pending',
        created_at__lt=timeout
    )

    for order in orders:
        with transaction.atomic():
            # 1. 更新訂單狀態
            order.status = 'cancelled'
            order.save()

            # 2. 恢復數據庫庫存
            Product.objects.filter(id=order.product_id).update(
                stock=F('stock') + order.quantity
            )

            # 3. 恢復 Redis 庫存
            inventory_manager.restore_stock(order.product_id, order.quantity)

            logger.info(f'Order {order.order_no} cancelled, stock restored')

📊 方案總結

防護層級技術方案目的
第一層前端校驗防止用戶誤操作
第二層應用層限流防止惡意刷單
第三層Redis Lua 腳本原子扣減,防止超賣
第四層異步訂單 + 消息隊列削峰填谷
第五層數據庫 F() 表達式最終校驗,確保一致性

💡 面試要點

Q1: 如何徹底避免超賣?

答: 多層防護 + 原子操作:

  1. Redis Lua 腳本確保原子扣減
  2. 數據庫 F() 表達式最終兜底
  3. 監控告警及時發現問題

Q2: Redis 和數據庫庫存不一致怎麼辦?

答:

  • 預防:使用事務、Lua 腳本確保操作原子性
  • 檢測:定時任務比對 Redis 和數據庫庫存
  • 修復:以數據庫為準,重新同步到 Redis

Q3: 秒殺和普通購買有什麼區別?

答:

維度普通購買秒殺
並發量極高
庫存操作可以直接操作 DB必須用 Redis
訂單創建同步異步(消息隊列)
限流不需要必須限流

Q4: 如果要設計一個秒殺系統,你會怎麼做?

答:

  1. 動靜分離:商品詳情頁靜態化,部署到 CDN
  2. 多級限流:CDN → Nginx → 應用層層限流
  3. Redis 預扣:庫存預熱到 Redis,Lua 腳本原子扣減
  4. 異步處理:消息隊列削峰,異步創建訂單
  5. 監控告警:實時監控庫存、訂單、系統狀態

🎓 總結

防止超賣是一個系統工程,需要:

  1. 技術手段:原子操作、分布式鎖、消息隊列
  2. 架構設計:多層防護、降級策略、監控告警
  3. 業務規則:限購、限流、支付超時取消

只有將這些方面結合起來,才能構建一個健壯的防超賣體系。


🏆 第八章完結

恭喜你完成了「高並發場景處理」章節!

你已經掌握了:

  • ✅ 庫存扣減問題的解決方案
  • ✅ 秒殺系統的完整設計
  • ✅ 分布式鎖的實現與應用
  • ✅ 防止超賣的多層防護體系

這些知識不僅對面試有幫助,在實際工作中也會頻繁用到。

閱讀時間:10 分鐘

0%