Django 面試準備 09-3:緩存穿透、擊穿、雪崩

深入理解三大緩存問題與完整解決方案

09-3. 緩存穿透、擊穿、雪崩(Cache Penetration, Breakdown, Avalanche)

📌 三大緩存問題概述

在高並發系統中,緩存是性能的關鍵。但不當使用會引發嚴重問題:

問題原因影響嚴重程度
穿透 (Penetration)查詢不存在的數據每次都打到數據庫⚠️⚠️⚠️
擊穿 (Breakdown)熱點數據過期瞬間大量請求打到數據庫⚠️⚠️⚠️⚠️
雪崩 (Avalanche)大量緩存同時過期數據庫瞬間崩潰⚠️⚠️⚠️⚠️⚠️

🔴 問題 1:緩存穿透(Cache Penetration)

什麼是緩存穿透(Cache Penetration)?

定義: 查詢一個不存在的數據,緩存和數據庫都沒有,導致每次請求都打到數據庫。

典型場景

# 用戶查詢 product_id = 99999(不存在)
def get_product(product_id):
    # 1. 查緩存 → 沒有
    product = cache.get(f'product:{product_id}')
    if product:
        return product

    # 2. 查數據庫 → 沒有
    try:
        product = Product.objects.get(id=product_id)
        cache.set(f'product:{product_id}', product, 300)
        return product
    except Product.DoesNotExist:
        return None  # ⚠️ 沒有緩存空結果!

攻擊演示

# 駭客攻擊:故意查詢不存在的資料
for i in range(100000):
    # 故意用不存在的 ID
    requests.get(f'/api/user?id=-{i}')

# 結果:
# - 10 萬次請求
# - 全部打到資料庫(Redis 沒用)
# - 資料庫被打爆 💥

後果:

資料庫壓力:
正常:    ██ 10% CPU
被攻擊:  ████████████████████ 100% CPU 💥

網站:
正常:    回應時間 100ms
被攻擊:  回應時間 10 秒 → 超時 → 掛掉 ✗

真實影響:

  • 💥 數據庫 CPU 飆升至 100%
  • 🐌 正常請求也變慢(連帶影響)
  • ❌ 用戶無法訪問網站
  • 💰 可能觸發 DDoS 警報,產生額外費用

✅ 解決方案 1-1:緩存空值

from django.core.cache import cache

def get_product_v1(product_id):
    """緩存空值方案"""
    cache_key = f'product:{product_id}'
    product = cache.get(cache_key)

    # 命中緩存(包括空值)
    if product is not None:
        if product == 'NULL':
            return None
        return product

    # 查詢數據庫
    try:
        product = Product.objects.get(id=product_id)
        # 緩存 5 分鐘
        cache.set(cache_key, product, timeout=300)
        return product
    except Product.DoesNotExist:
        # ✅ 緩存空值,但設置較短過期時間
        cache.set(cache_key, 'NULL', timeout=60)
        return None

優點:

  • ✅ 簡單易實現
  • ✅ 防止重複查詢

缺點:

  • ⚠️ 佔用緩存空間
  • ⚠️ 如果攻擊大量不同 ID,仍會佔用大量內存

✅ 解決方案 1-2:布隆過濾器(Bloom Filter)

什麼是布隆過濾器(Bloom Filter)?

布隆過濾器(Bloom Filter) 是一種概率型數據結構,用於判斷元素是否不存在

特性:

  • ✅ 如果返回「不存在」,一定不存在
  • ⚠️ 如果返回「可能存在」,可能誤判

Django + Redis 實現

from redis import Redis
from redis.commands.bf import BFBloom

class BloomFilter:
    def __init__(self):
        self.redis_client = Redis(host='localhost', port=6379, db=0)
        self.bf = BFBloom(self.redis_client)
        self.key = 'product:bloom_filter'

    def init_filter(self):
        """初始化:將所有存在的 ID 加入過濾器"""
        # 創建布隆過濾器(預計 100 萬條數據,錯誤率 0.01)
        self.bf.create(self.key, capacity=1000000, error_rate=0.01)

        # 將所有商品 ID 加入過濾器
        product_ids = Product.objects.values_list('id', flat=True)
        for product_id in product_ids:
            self.bf.add(self.key, product_id)

    def exists(self, product_id):
        """檢查 ID 是否可能存在"""
        return self.bf.exists(self.key, product_id)

    def add(self, product_id):
        """添加新 ID"""
        self.bf.add(self.key, product_id)

# 全局實例
bloom_filter = BloomFilter()

def get_product_v2(product_id):
    """使用布隆過濾器"""
    # 1. 先檢查布隆過濾器
    if not bloom_filter.exists(product_id):
        # 確定不存在,直接返回
        return None

    # 2. 可能存在,查緩存
    cache_key = f'product:{product_id}'
    product = cache.get(cache_key)
    if product is not None:
        return product

    # 3. 查數據庫
    try:
        product = Product.objects.get(id=product_id)
        cache.set(cache_key, product, timeout=300)
        return product
    except Product.DoesNotExist:
        # 誤判(很少發生),緩存空值
        cache.set(cache_key, 'NULL', timeout=60)
        return None

在模型信號中更新布隆過濾器

from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver

@receiver(post_save, sender=Product)
def add_to_bloom_filter(sender, instance, created, **kwargs):
    """新增商品時更新布隆過濾器"""
    if created:
        bloom_filter.add(instance.id)

@receiver(post_delete, sender=Product)
def remove_from_cache(sender, instance, **kwargs):
    """刪除商品時清除緩存"""
    cache.delete(f'product:{instance.id}')
    # 注意:布隆過濾器不支持刪除,只能重建

🔴 問題 2:緩存擊穿(Cache Breakdown / Hotkey Expiration)

什麼是緩存擊穿(Cache Breakdown)?

定義: 一個熱點數據過期,同時大量請求訪問,導致瞬間打到數據庫。

典型場景

時刻    請求 1              請求 2              請求 3          數據庫
T1      查緩存 → 沒有
T2                          查緩存 → 沒有
T3                                              查緩存 → 沒有
T4      查數據庫 →                                              負載 +1
T5                          查數據庫 →                          負載 +2
T6                                              查數據庫 →      負載 +3
...     (1000 個並發請求同時查數據庫)                          負載 +1000 💥

例子:

  • 明星突然宣布結婚,個人主頁緩存剛好過期
  • 秒殺活動開始,商品緩存剛好過期
  • 熱門新聞緩存過期

✅ 解決方案 2-1:互斥鎖(Mutex Lock)

基本思路

只允許一個請求查詢數據庫,其他請求等待。

Django + Redis 實現

import time
from django.core.cache import cache

def get_product_with_lock(product_id):
    """使用互斥鎖防止緩存擊穿"""
    cache_key = f'product:{product_id}'
    lock_key = f'lock:product:{product_id}'

    # 1. 嘗試從緩存獲取
    product = cache.get(cache_key)
    if product is not None:
        return product

    # 2. 嘗試獲取鎖(使用 Redis SETNX)
    lock_acquired = cache.add(lock_key, 'locked', timeout=10)

    if lock_acquired:
        # 成功獲取鎖,查詢數據庫
        try:
            product = Product.objects.get(id=product_id)
            cache.set(cache_key, product, timeout=300)
            return product
        except Product.DoesNotExist:
            cache.set(cache_key, 'NULL', timeout=60)
            return None
        finally:
            # 釋放鎖
            cache.delete(lock_key)
    else:
        # 未獲取鎖,等待並重試
        time.sleep(0.1)
        return get_product_with_lock(product_id)  # 遞歸重試

更安全的實現(使用 django-redis)

from django_redis import get_redis_connection
import uuid

def get_product_with_distributed_lock(product_id):
    """使用分布式鎖(Distributed Lock)"""
    cache_key = f'product:{product_id}'
    lock_key = f'lock:product:{product_id}'
    redis_conn = get_redis_connection('default')

    # 1. 嘗試從緩存獲取
    product = cache.get(cache_key)
    if product is not None:
        return product

    # 2. 獲取分布式鎖
    lock_value = str(uuid.uuid4())
    max_retries = 3
    retry_delay = 0.05

    for attempt in range(max_retries):
        # 使用 SET NX EX 原子操作
        if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
            try:
                # 雙重檢查緩存
                product = cache.get(cache_key)
                if product is not None:
                    return product

                # 查詢數據庫
                product = Product.objects.get(id=product_id)
                cache.set(cache_key, product, timeout=300)
                return product
            except Product.DoesNotExist:
                cache.set(cache_key, 'NULL', timeout=60)
                return None
            finally:
                # 安全釋放鎖(只刪除自己的鎖)
                lua_script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                redis_conn.eval(lua_script, 1, lock_key, lock_value)
        else:
            # 等待後重試
            time.sleep(retry_delay)

    # 重試失敗,直接查詢數據庫(降級)
    try:
        return Product.objects.get(id=product_id)
    except Product.DoesNotExist:
        return None

✅ 解決方案 2-2:邏輯過期(Logical Expiration)

基本思路

數據永不過期,但在數據中記錄過期時間,後台異步更新。

import time
import threading
from django.core.cache import cache

class LogicalExpireCache:
    @staticmethod
    def set_with_logical_expire(key, value, expire_seconds):
        """設置帶邏輯過期時間的緩存"""
        data = {
            'value': value,
            'expire_time': time.time() + expire_seconds
        }
        # 永不過期
        cache.set(key, data, timeout=None)

    @staticmethod
    def get_with_logical_expire(key, rebuild_func, expire_seconds):
        """獲取帶邏輯過期的緩存"""
        data = cache.get(key)

        # 緩存不存在
        if data is None:
            # 獲取鎖並重建
            lock_key = f'lock:{key}'
            if cache.add(lock_key, 'locked', timeout=10):
                try:
                    value = rebuild_func()
                    LogicalExpireCache.set_with_logical_expire(
                        key, value, expire_seconds
                    )
                    return value
                finally:
                    cache.delete(lock_key)
            else:
                # 等待其他線程重建
                time.sleep(0.05)
                return LogicalExpireCache.get_with_logical_expire(
                    key, rebuild_func, expire_seconds
                )

        # 檢查是否邏輯過期
        if time.time() > data['expire_time']:
            # 已過期,異步重建
            lock_key = f'lock:{key}'
            if cache.add(lock_key, 'locked', timeout=10):
                # 啟動後台線程重建緩存
                def rebuild():
                    try:
                        value = rebuild_func()
                        LogicalExpireCache.set_with_logical_expire(
                            key, value, expire_seconds
                        )
                    finally:
                        cache.delete(lock_key)

                thread = threading.Thread(target=rebuild)
                thread.start()

        # 返回舊數據(即使過期)
        return data['value']

# 使用
def get_hot_product(product_id):
    def rebuild():
        return Product.objects.get(id=product_id)

    return LogicalExpireCache.get_with_logical_expire(
        key=f'product:{product_id}',
        rebuild_func=rebuild,
        expire_seconds=300
    )

🔴 問題 3:緩存雪崩(Cache Avalanche)

什麼是緩存雪崩(Cache Avalanche)?

簡單說: 大量 key 同時過期,導致大量請求同時打到數據庫

定義: 大量緩存同時過期,或緩存服務器宕機,導致所有請求打到數據庫。


典型場景

場景 1:啟動時批量寫入緩存

# 系統啟動時,預熱緩存
def warmup_cache():
    products = Product.objects.all()

    for product in products:
        # ❌ 問題:所有商品都設定 1 小時過期
        cache.set(f'product:{product.id}', product, timeout=3600)

# 時間軸:
# 00:00 - 系統啟動,寫入 10000 個商品緩存
# 01:00 - 10000 個 key 同時過期 ⏰💥

結果:

01:00:00 - 10000 個緩存同時失效

接下來的請求:
Request 1: Redis miss → 查 DB
Request 2: Redis miss → 查 DB
Request 3: Redis miss → 查 DB
...
Request 10000: Redis miss → 查 DB

數據庫:💥💥💥 瞬間被打爆

場景 2:定時任務批量刷新

# 每天凌晨 0 點更新所有商品緩存
def refresh_all_products():
    products = Product.objects.all()

    for product in products:
        # ❌ 所有商品都在 0 點過期
        cache.set(f'product:{product.id}', product, timeout=86400)  # 24 小時

# 問題:
# 第一天 00:00 - 寫入緩存
# 第二天 00:00 - 全部過期 💥
# 第三天 00:00 - 全部過期 💥
# ...週而復始

場景 3:促銷活動結束

# 雙 11 活動:商品特價信息
def set_sale_price(product_id, sale_price):
    from datetime import datetime

    # 所有促銷商品都設定相同過期時間
    expire_time = datetime(2024, 11, 12, 0, 0, 0)  # 11/12 凌晨結束
    ttl = (expire_time - datetime.now()).total_seconds()

    cache.set(f'sale:{product_id}', sale_price, timeout=int(ttl))

# 問題:
# 11/12 00:00 - 10000 個促銷商品緩存同時過期 💥

雪崩的嚴重後果

1. 數據庫崩潰 💥

正常情況:

┌─────────────────┐
│ Redis           │  99% 請求命中
│ QPS: 100,000    │
└─────────────────┘
         ↓ 1%
┌─────────────────┐
│ Database        │
│ QPS: 1,000      │  輕鬆應付
└─────────────────┘

雪崩發生:

┌─────────────────┐
│ Redis           │  0% 命中(全部過期)
│ QPS: 100,000    │
└─────────────────┘
         ↓ 100%
┌─────────────────┐
│ Database        │
│ QPS: 100,000    │  💥 瞬間崩潰
└─────────────────┘

2. 災難時間線

23:59:59 - 一切正常,QPS 10 萬
00:00:00 - 促銷緩存全部過期 ⏰
00:00:01 - 數據庫 QPS 從 1000 暴增到 100,000
00:00:05 - 數據庫連接池耗盡
00:00:10 - 應用服務器開始超時
00:00:30 - 整個網站無法訪問
00:05:00 - 緊急重啟,損失數百萬營收

根本原因: 緩存雪崩


✅ 解決方案

方案 1:過期時間加隨機值 ⭐⭐⭐⭐⭐(最常用)

概念: 讓 key 的過期時間分散開,不要同時過期

import random
from django.core.cache import cache

# ❌ 錯誤:所有 key 都是 1 小時過期
for product in products:
    cache.set(f'product:{product.id}', product, timeout=3600)

# ✅ 正確:過期時間加隨機值
for product in products:
    # 基礎時間 1 小時 + 隨機 0-300 秒
    expire_time = 3600 + random.randint(0, 300)
    cache.set(f'product:{product.id}', product, timeout=expire_time)

效果對比:

沒有隨機值:
00:00 - 10000 個 key 同時過期 💥
01:00 - 10000 個 key 同時過期 💥
02:00 - 10000 個 key 同時過期 💥

有隨機值:
01:00:00 - 20 個 key 過期
01:00:01 - 15 個 key 過期
01:00:02 - 18 個 key 過期
01:00:03 - 22 個 key 過期
...
01:05:00 - 所有 key 陸續過期完畢 ✓

結果:過期時間分散在 5 分鐘內,數據庫壓力平穩

實際代碼:

def set_cache_with_random_expire(key, value, base_expire):
    """設置緩存,過期時間加隨機值"""
    # 隨機範圍:基礎時間的 10%
    random_range = int(base_expire * 0.1)
    expire_time = base_expire + random.randint(0, random_range)

    cache.set(key, value, timeout=expire_time)

# 使用
set_cache_with_random_expire('product:123', product_data, 3600)
# 實際過期時間:3600-3960 秒(1-1.1 小時)

方案 2:互斥鎖(和擊穿方案一樣)⭐⭐⭐⭐

import time
import uuid
from django_redis import get_redis_connection

def get_product(product_id):
    cache_key = f'product:{product_id}'

    # 查緩存
    product = cache.get(cache_key)
    if product:
        return product

    # 嘗試獲取鎖
    redis_conn = get_redis_connection('default')
    lock_key = f'lock:product:{product_id}'
    lock_value = str(uuid.uuid4())

    if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
        try:
            # 雙重檢查
            product = cache.get(cache_key)
            if product:
                return product

            # 查詢數據庫
            product = Product.objects.get(id=product_id)

            # 寫入緩存(加隨機值)
            expire = 3600 + random.randint(0, 300)
            cache.set(cache_key, product, timeout=expire)

            return product
        finally:
            # 安全釋放鎖
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """
            redis_conn.eval(lua_script, 1, lock_key, lock_value)
    else:
        # 沒拿到鎖,等待後重試
        time.sleep(0.05)
        return get_product(product_id)

效果:

即使 10000 個 key 同時過期:

Product 1: Request A 拿到鎖 → 查 DB → 寫緩存
           Request B, C, D... 等待

Product 2: Request E 拿到鎖 → 查 DB → 寫緩存
           Request F, G, H... 等待

...

結果:同時只有 N 個請求查數據庫(N = key 的數量)
而不是 M 個請求(M = 用戶請求總數)

方案 3:多級緩存 ⭐⭐⭐

概念: 本地緩存 + Redis,即使 Redis 掛了也有本地緩存頂著

import time

# 本地緩存(進程內內存)
local_cache = {}

def get_product(product_id):
    # 第 1 層:本地緩存(最快)
    cache_key = f'product:{product_id}'

    if cache_key in local_cache:
        data, expire_time = local_cache[cache_key]
        if time.time() < expire_time:
            return data

    # 第 2 層:Redis 緩存
    product = cache.get(cache_key)
    if product:
        # 寫入本地緩存(30 秒)
        local_cache[cache_key] = (product, time.time() + 30)
        return product

    # 第 3 層:數據庫
    product = Product.objects.get(id=product_id)

    # 寫入 Redis
    expire = 3600 + random.randint(0, 300)
    cache.set(cache_key, product, timeout=expire)

    # 寫入本地緩存
    local_cache[cache_key] = (product, time.time() + 30)

    return product

效果:

Redis 雪崩時:

Request 1: 本地緩存 miss → Redis miss → 查 DB
Request 2: 本地緩存 hit ✓(30 秒內)
Request 3: 本地緩存 hit ✓
Request 4: 本地緩存 hit ✓
...

結果:即使 Redis 失效,本地緩存還能撐 30 秒

架構圖:

用戶請求
    ↓
┌─────────────────┐
│ 本地緩存(30秒) │ ← 第 1 層防護
└─────────────────┘
    ↓ miss
┌─────────────────┐
│ Redis(1 小時)  │ ← 第 2 層防護
└─────────────────┘
    ↓ miss
┌─────────────────┐
│ 數據庫          │ ← 最後防線
└─────────────────┘

方案 4:限流降級 ⭐⭐⭐⭐

概念: 當數據庫壓力過大時,限制請求或降級服務

from threading import Semaphore

# 限制同時訪問數據庫的請求數
db_semaphore = Semaphore(100)  # 最多 100 個並發

def get_product(product_id):
    cache_key = f'product:{product_id}'

    # 查 Redis
    product = cache.get(cache_key)
    if product:
        return product

    # Redis miss,需要查 DB
    if not db_semaphore.acquire(blocking=False):
        # 無法獲取信號量,數據庫壓力太大
        # 降級:返回默認值或提示
        return {
            'id': product_id,
            'name': '商品載入中...',
            'price': 0,
            'available': False
        }

    try:
        product = Product.objects.get(id=product_id)

        expire = 3600 + random.randint(0, 300)
        cache.set(cache_key, product, timeout=expire)

        return product
    finally:
        db_semaphore.release()

進階:熔斷器模式

class CircuitBreaker:
    def __init__(self, threshold=10, timeout=60):
        self.failure_count = 0
        self.threshold = threshold  # 失敗次數閾值
        self.timeout = timeout
        self.last_failure_time = 0
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            # 熔斷器打開,檢查是否可以嘗試恢復
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)

            # 成功,重置計數器
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
            self.failure_count = 0

            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            # 達到閾值,打開熔斷器
            if self.failure_count >= self.threshold:
                self.state = 'OPEN'

            raise e

# 使用
db_breaker = CircuitBreaker(threshold=10, timeout=60)

def get_product_from_db(product_id):
    try:
        return db_breaker.call(
            Product.objects.get,
            id=product_id
        )
    except:
        # 熔斷器打開,返回降級數據
        return get_fallback_data(product_id)

方案 5:緩存預熱 ⭐⭐⭐

概念: 在系統啟動或低峰期提前載入緩存

def warmup_cache():
    """系統啟動時預熱緩存"""
    print("開始預熱緩存...")

    # 獲取熱門商品
    hot_products = Product.objects.filter(is_hot=True)

    for i, product in enumerate(hot_products):
        # ✅ 分批載入,避免同時過期
        batch = i // 100  # 每 100 個一批
        delay = batch * 60  # 每批間隔 1 分鐘

        base_expire = 3600 + delay
        random_expire = base_expire + random.randint(0, 300)

        cache.set(
            f'product:{product.id}',
            product,
            timeout=random_expire
        )

    print("緩存預熱完成!")

# 在系統啟動時執行
warmup_cache()

定時預熱:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')

@app.task
def refresh_hot_products():
    """每小時刷新熱門商品緩存(錯開時間)"""
    hot_products = get_hot_products()

    for i, product_id in enumerate(hot_products):
        # 錯開刷新時間
        delay = i * 2  # 每個商品間隔 2 秒

        refresh_product_cache.apply_async(
            args=[product_id],
            countdown=delay
        )

@app.task
def refresh_product_cache(product_id):
    product = Product.objects.get(id=product_id)
    expire = 3600 + random.randint(0, 300)
    cache.set(f'product:{product_id}', product, timeout=expire)

# 設定定時任務
app.conf.beat_schedule = {
    'refresh-hot-products': {
        'task': 'tasks.refresh_hot_products',
        'schedule': 3600.0,  # 每小時執行
    },
}

方案 6:Redis 持久化 + 高可用 ⭐⭐⭐⭐

概念: 即使 Redis 重啟,數據也不會全部丟失

redis.conf 配置:

# 1. RDB 持久化(快照)
save 900 1      # 900 秒內至少 1 個 key 變化就儲存
save 300 10     # 300 秒內至少 10 個 key 變化就儲存
save 60 10000   # 60 秒內至少 10000 個 key 變化就儲存

# 2. AOF 持久化(日誌)
appendonly yes
appendfsync everysec  # 每秒同步一次

# 3. 主從複製
replicaof <master-ip> <master-port>

# 4. Sentinel 高可用
sentinel monitor mymaster 127.0.0.1 6379 2

效果:

Redis 重啟:
├─ 有 RDB/AOF → 數據恢復 ✓
├─ 部分緩存失效(最近的)
└─ 不會全部雪崩

Redis 主節點掛了:
├─ Sentinel 自動切換到從節點
├─ 服務幾乎不中斷
└─ 避免雪崩

📊 三大問題對比總結

問題原因特徵最佳方案
穿透 (Penetration)查詢不存在的數據持續打數據庫布隆過濾器 (Bloom Filter) + 緩存空值
擊穿 (Breakdown)熱點數據過期瞬間高並發互斥鎖 (Mutex Lock) + 邏輯過期 (Logical Expiration)
雪崩 (Avalanche)大量緩存同時過期數據庫崩潰隨機過期 + 多級緩存 (Multi-level Cache) + 熔斷 (Circuit Breaker)

💡 完整解決方案

from django.core.cache import cache
from django_redis import get_redis_connection
import random
import time
import uuid

class CacheManager:
    """統一緩存管理器"""

    def __init__(self, bloom_filter=None):
        self.bloom_filter = bloom_filter
        self.redis_conn = get_redis_connection('default')

    def get_with_protection(self, key, rebuild_func, timeout=300):
        """
        獲取緩存(防穿透、擊穿、雪崩)

        Args:
            key: 緩存鍵
            rebuild_func: 重建數據的函數
            timeout: 基礎超時時間
        """
        # 1. 防穿透:檢查布隆過濾器
        if self.bloom_filter and not self.bloom_filter.exists(key):
            return None

        # 2. 查緩存
        value = cache.get(key)
        if value is not None:
            if value == 'NULL':
                return None
            return value

        # 3. 防擊穿:分布式鎖
        lock_key = f'lock:{key}'
        lock_value = str(uuid.uuid4())

        if self.redis_conn.set(lock_key, lock_value, nx=True, ex=10):
            try:
                # 雙重檢查
                value = cache.get(key)
                if value is not None:
                    return value if value != 'NULL' else None

                # 重建數據
                try:
                    value = rebuild_func()
                    if value is None:
                        cache.set(key, 'NULL', timeout=60)
                        return None

                    # 防雪崩:隨機過期時間
                    random_timeout = timeout + random.randint(-timeout // 5, timeout // 5)
                    cache.set(key, value, timeout=random_timeout)
                    return value
                except Exception as e:
                    # 記錄錯誤
                    print(f"Rebuild failed: {e}")
                    return None
            finally:
                # 安全釋放鎖
                lua_script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                self.redis_conn.eval(lua_script, 1, lock_key, lock_value)
        else:
            # 等待並重試
            time.sleep(0.05)
            return cache.get(key)

# 使用
cache_manager = CacheManager(bloom_filter=bloom_filter)

def get_product(product_id):
    return cache_manager.get_with_protection(
        key=f'product:{product_id}',
        rebuild_func=lambda: Product.objects.get(id=product_id),
        timeout=300
    )

💡 面試要點

Q1: 緩存穿透(Cache Penetration)和緩存擊穿(Cache Breakdown)有什麼區別?

答:

  • 穿透 (Penetration):查詢不存在的數據,持續打數據庫
  • 擊穿 (Breakdown)熱點數據過期,瞬間高並發打數據庫

Q2: 布隆過濾器(Bloom Filter)為什麼能防穿透?

答:

  • 布隆過濾器可以快速判斷數據一定不存在
  • 不存在的直接返回,不查數據庫
  • 可能存在的才查詢(允許小概率誤判)

Q3: 互斥鎖(Mutex Lock)會降低性能嗎?

答:

  • 會有輕微影響,但只在緩存失效瞬間
  • 相比數據庫崩潰,這點性能損失可接受
  • 可以用「邏輯過期(Logical Expiration)」優化,返回舊數據

Q4: 如何防止緩存雪崩(Cache Avalanche)?

答:

  1. 隨機過期時間(Random Expiration Time):分散過期
  2. 緩存預熱(Cache Warming):提前加載
  3. 多級緩存(Multi-level Cache):本地 + Redis
  4. 熔斷降級(Circuit Breaker):保護數據庫

🔗 下一篇

在下一篇文章中,我們將學習 緩存更新策略,探討如何保證緩存與數據庫的一致性。

閱讀時間:10 分鐘

0%