Django 面試準備 09-3:緩存穿透、擊穿、雪崩
深入理解三大緩存問題與完整解決方案
目錄
09-3. 緩存穿透、擊穿、雪崩(Cache Penetration, Breakdown, Avalanche)
📌 三大緩存問題概述
在高並發系統中,緩存是性能的關鍵。但不當使用會引發嚴重問題:
| 問題 | 原因 | 影響 | 嚴重程度 |
|---|---|---|---|
| 穿透 (Penetration) | 查詢不存在的數據 | 每次都打到數據庫 | ⚠️⚠️⚠️ |
| 擊穿 (Breakdown) | 熱點數據過期 | 瞬間大量請求打到數據庫 | ⚠️⚠️⚠️⚠️ |
| 雪崩 (Avalanche) | 大量緩存同時過期 | 數據庫瞬間崩潰 | ⚠️⚠️⚠️⚠️⚠️ |
🔴 問題 1:緩存穿透(Cache Penetration)
什麼是緩存穿透(Cache Penetration)?
定義: 查詢一個不存在的數據,緩存和數據庫都沒有,導致每次請求都打到數據庫。
典型場景
# 用戶查詢 product_id = 99999(不存在)
def get_product(product_id):
# 1. 查緩存 → 沒有
product = cache.get(f'product:{product_id}')
if product:
return product
# 2. 查數據庫 → 沒有
try:
product = Product.objects.get(id=product_id)
cache.set(f'product:{product_id}', product, 300)
return product
except Product.DoesNotExist:
return None # ⚠️ 沒有緩存空結果!攻擊演示
# 駭客攻擊:故意查詢不存在的資料
for i in range(100000):
# 故意用不存在的 ID
requests.get(f'/api/user?id=-{i}')
# 結果:
# - 10 萬次請求
# - 全部打到資料庫(Redis 沒用)
# - 資料庫被打爆 💥後果:
資料庫壓力:
正常: ██ 10% CPU
被攻擊: ████████████████████ 100% CPU 💥
網站:
正常: 回應時間 100ms
被攻擊: 回應時間 10 秒 → 超時 → 掛掉 ✗真實影響:
- 💥 數據庫 CPU 飆升至 100%
- 🐌 正常請求也變慢(連帶影響)
- ❌ 用戶無法訪問網站
- 💰 可能觸發 DDoS 警報,產生額外費用
✅ 解決方案 1-1:緩存空值
from django.core.cache import cache
def get_product_v1(product_id):
"""緩存空值方案"""
cache_key = f'product:{product_id}'
product = cache.get(cache_key)
# 命中緩存(包括空值)
if product is not None:
if product == 'NULL':
return None
return product
# 查詢數據庫
try:
product = Product.objects.get(id=product_id)
# 緩存 5 分鐘
cache.set(cache_key, product, timeout=300)
return product
except Product.DoesNotExist:
# ✅ 緩存空值,但設置較短過期時間
cache.set(cache_key, 'NULL', timeout=60)
return None優點:
- ✅ 簡單易實現
- ✅ 防止重複查詢
缺點:
- ⚠️ 佔用緩存空間
- ⚠️ 如果攻擊大量不同 ID,仍會佔用大量內存
✅ 解決方案 1-2:布隆過濾器(Bloom Filter)
什麼是布隆過濾器(Bloom Filter)?
布隆過濾器(Bloom Filter) 是一種概率型數據結構,用於判斷元素是否不存在。
特性:
- ✅ 如果返回「不存在」,一定不存在
- ⚠️ 如果返回「可能存在」,可能誤判
Django + Redis 實現
from redis import Redis
from redis.commands.bf import BFBloom
class BloomFilter:
def __init__(self):
self.redis_client = Redis(host='localhost', port=6379, db=0)
self.bf = BFBloom(self.redis_client)
self.key = 'product:bloom_filter'
def init_filter(self):
"""初始化:將所有存在的 ID 加入過濾器"""
# 創建布隆過濾器(預計 100 萬條數據,錯誤率 0.01)
self.bf.create(self.key, capacity=1000000, error_rate=0.01)
# 將所有商品 ID 加入過濾器
product_ids = Product.objects.values_list('id', flat=True)
for product_id in product_ids:
self.bf.add(self.key, product_id)
def exists(self, product_id):
"""檢查 ID 是否可能存在"""
return self.bf.exists(self.key, product_id)
def add(self, product_id):
"""添加新 ID"""
self.bf.add(self.key, product_id)
# 全局實例
bloom_filter = BloomFilter()
def get_product_v2(product_id):
"""使用布隆過濾器"""
# 1. 先檢查布隆過濾器
if not bloom_filter.exists(product_id):
# 確定不存在,直接返回
return None
# 2. 可能存在,查緩存
cache_key = f'product:{product_id}'
product = cache.get(cache_key)
if product is not None:
return product
# 3. 查數據庫
try:
product = Product.objects.get(id=product_id)
cache.set(cache_key, product, timeout=300)
return product
except Product.DoesNotExist:
# 誤判(很少發生),緩存空值
cache.set(cache_key, 'NULL', timeout=60)
return None在模型信號中更新布隆過濾器
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@receiver(post_save, sender=Product)
def add_to_bloom_filter(sender, instance, created, **kwargs):
"""新增商品時更新布隆過濾器"""
if created:
bloom_filter.add(instance.id)
@receiver(post_delete, sender=Product)
def remove_from_cache(sender, instance, **kwargs):
"""刪除商品時清除緩存"""
cache.delete(f'product:{instance.id}')
# 注意:布隆過濾器不支持刪除,只能重建🔴 問題 2:緩存擊穿(Cache Breakdown / Hotkey Expiration)
什麼是緩存擊穿(Cache Breakdown)?
定義: 一個熱點數據過期,同時大量請求訪問,導致瞬間打到數據庫。
典型場景
時刻 請求 1 請求 2 請求 3 數據庫
T1 查緩存 → 沒有
T2 查緩存 → 沒有
T3 查緩存 → 沒有
T4 查數據庫 → 負載 +1
T5 查數據庫 → 負載 +2
T6 查數據庫 → 負載 +3
... (1000 個並發請求同時查數據庫) 負載 +1000 💥例子:
- 明星突然宣布結婚,個人主頁緩存剛好過期
- 秒殺活動開始,商品緩存剛好過期
- 熱門新聞緩存過期
✅ 解決方案 2-1:互斥鎖(Mutex Lock)
基本思路
只允許一個請求查詢數據庫,其他請求等待。
Django + Redis 實現
import time
from django.core.cache import cache
def get_product_with_lock(product_id):
"""使用互斥鎖防止緩存擊穿"""
cache_key = f'product:{product_id}'
lock_key = f'lock:product:{product_id}'
# 1. 嘗試從緩存獲取
product = cache.get(cache_key)
if product is not None:
return product
# 2. 嘗試獲取鎖(使用 Redis SETNX)
lock_acquired = cache.add(lock_key, 'locked', timeout=10)
if lock_acquired:
# 成功獲取鎖,查詢數據庫
try:
product = Product.objects.get(id=product_id)
cache.set(cache_key, product, timeout=300)
return product
except Product.DoesNotExist:
cache.set(cache_key, 'NULL', timeout=60)
return None
finally:
# 釋放鎖
cache.delete(lock_key)
else:
# 未獲取鎖,等待並重試
time.sleep(0.1)
return get_product_with_lock(product_id) # 遞歸重試更安全的實現(使用 django-redis)
from django_redis import get_redis_connection
import uuid
def get_product_with_distributed_lock(product_id):
"""使用分布式鎖(Distributed Lock)"""
cache_key = f'product:{product_id}'
lock_key = f'lock:product:{product_id}'
redis_conn = get_redis_connection('default')
# 1. 嘗試從緩存獲取
product = cache.get(cache_key)
if product is not None:
return product
# 2. 獲取分布式鎖
lock_value = str(uuid.uuid4())
max_retries = 3
retry_delay = 0.05
for attempt in range(max_retries):
# 使用 SET NX EX 原子操作
if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
try:
# 雙重檢查緩存
product = cache.get(cache_key)
if product is not None:
return product
# 查詢數據庫
product = Product.objects.get(id=product_id)
cache.set(cache_key, product, timeout=300)
return product
except Product.DoesNotExist:
cache.set(cache_key, 'NULL', timeout=60)
return None
finally:
# 安全釋放鎖(只刪除自己的鎖)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
redis_conn.eval(lua_script, 1, lock_key, lock_value)
else:
# 等待後重試
time.sleep(retry_delay)
# 重試失敗,直接查詢數據庫(降級)
try:
return Product.objects.get(id=product_id)
except Product.DoesNotExist:
return None✅ 解決方案 2-2:邏輯過期(Logical Expiration)
基本思路
數據永不過期,但在數據中記錄過期時間,後台異步更新。
import time
import threading
from django.core.cache import cache
class LogicalExpireCache:
@staticmethod
def set_with_logical_expire(key, value, expire_seconds):
"""設置帶邏輯過期時間的緩存"""
data = {
'value': value,
'expire_time': time.time() + expire_seconds
}
# 永不過期
cache.set(key, data, timeout=None)
@staticmethod
def get_with_logical_expire(key, rebuild_func, expire_seconds):
"""獲取帶邏輯過期的緩存"""
data = cache.get(key)
# 緩存不存在
if data is None:
# 獲取鎖並重建
lock_key = f'lock:{key}'
if cache.add(lock_key, 'locked', timeout=10):
try:
value = rebuild_func()
LogicalExpireCache.set_with_logical_expire(
key, value, expire_seconds
)
return value
finally:
cache.delete(lock_key)
else:
# 等待其他線程重建
time.sleep(0.05)
return LogicalExpireCache.get_with_logical_expire(
key, rebuild_func, expire_seconds
)
# 檢查是否邏輯過期
if time.time() > data['expire_time']:
# 已過期,異步重建
lock_key = f'lock:{key}'
if cache.add(lock_key, 'locked', timeout=10):
# 啟動後台線程重建緩存
def rebuild():
try:
value = rebuild_func()
LogicalExpireCache.set_with_logical_expire(
key, value, expire_seconds
)
finally:
cache.delete(lock_key)
thread = threading.Thread(target=rebuild)
thread.start()
# 返回舊數據(即使過期)
return data['value']
# 使用
def get_hot_product(product_id):
def rebuild():
return Product.objects.get(id=product_id)
return LogicalExpireCache.get_with_logical_expire(
key=f'product:{product_id}',
rebuild_func=rebuild,
expire_seconds=300
)🔴 問題 3:緩存雪崩(Cache Avalanche)
什麼是緩存雪崩(Cache Avalanche)?
簡單說: 大量 key 同時過期,導致大量請求同時打到數據庫
定義: 大量緩存同時過期,或緩存服務器宕機,導致所有請求打到數據庫。
典型場景
場景 1:啟動時批量寫入緩存
# 系統啟動時,預熱緩存
def warmup_cache():
products = Product.objects.all()
for product in products:
# ❌ 問題:所有商品都設定 1 小時過期
cache.set(f'product:{product.id}', product, timeout=3600)
# 時間軸:
# 00:00 - 系統啟動,寫入 10000 個商品緩存
# 01:00 - 10000 個 key 同時過期 ⏰💥結果:
01:00:00 - 10000 個緩存同時失效
接下來的請求:
Request 1: Redis miss → 查 DB
Request 2: Redis miss → 查 DB
Request 3: Redis miss → 查 DB
...
Request 10000: Redis miss → 查 DB
數據庫:💥💥💥 瞬間被打爆場景 2:定時任務批量刷新
# 每天凌晨 0 點更新所有商品緩存
def refresh_all_products():
products = Product.objects.all()
for product in products:
# ❌ 所有商品都在 0 點過期
cache.set(f'product:{product.id}', product, timeout=86400) # 24 小時
# 問題:
# 第一天 00:00 - 寫入緩存
# 第二天 00:00 - 全部過期 💥
# 第三天 00:00 - 全部過期 💥
# ...週而復始場景 3:促銷活動結束
# 雙 11 活動:商品特價信息
def set_sale_price(product_id, sale_price):
from datetime import datetime
# 所有促銷商品都設定相同過期時間
expire_time = datetime(2024, 11, 12, 0, 0, 0) # 11/12 凌晨結束
ttl = (expire_time - datetime.now()).total_seconds()
cache.set(f'sale:{product_id}', sale_price, timeout=int(ttl))
# 問題:
# 11/12 00:00 - 10000 個促銷商品緩存同時過期 💥雪崩的嚴重後果
1. 數據庫崩潰 💥
正常情況:
┌─────────────────┐
│ Redis │ 99% 請求命中
│ QPS: 100,000 │
└─────────────────┘
↓ 1%
┌─────────────────┐
│ Database │
│ QPS: 1,000 │ 輕鬆應付
└─────────────────┘雪崩發生:
┌─────────────────┐
│ Redis │ 0% 命中(全部過期)
│ QPS: 100,000 │
└─────────────────┘
↓ 100%
┌─────────────────┐
│ Database │
│ QPS: 100,000 │ 💥 瞬間崩潰
└─────────────────┘2. 災難時間線
23:59:59 - 一切正常,QPS 10 萬
00:00:00 - 促銷緩存全部過期 ⏰
00:00:01 - 數據庫 QPS 從 1000 暴增到 100,000
00:00:05 - 數據庫連接池耗盡
00:00:10 - 應用服務器開始超時
00:00:30 - 整個網站無法訪問
00:05:00 - 緊急重啟,損失數百萬營收根本原因: 緩存雪崩
✅ 解決方案
方案 1:過期時間加隨機值 ⭐⭐⭐⭐⭐(最常用)
概念: 讓 key 的過期時間分散開,不要同時過期
import random
from django.core.cache import cache
# ❌ 錯誤:所有 key 都是 1 小時過期
for product in products:
cache.set(f'product:{product.id}', product, timeout=3600)
# ✅ 正確:過期時間加隨機值
for product in products:
# 基礎時間 1 小時 + 隨機 0-300 秒
expire_time = 3600 + random.randint(0, 300)
cache.set(f'product:{product.id}', product, timeout=expire_time)效果對比:
沒有隨機值:
00:00 - 10000 個 key 同時過期 💥
01:00 - 10000 個 key 同時過期 💥
02:00 - 10000 個 key 同時過期 💥
有隨機值:
01:00:00 - 20 個 key 過期
01:00:01 - 15 個 key 過期
01:00:02 - 18 個 key 過期
01:00:03 - 22 個 key 過期
...
01:05:00 - 所有 key 陸續過期完畢 ✓
結果:過期時間分散在 5 分鐘內,數據庫壓力平穩實際代碼:
def set_cache_with_random_expire(key, value, base_expire):
"""設置緩存,過期時間加隨機值"""
# 隨機範圍:基礎時間的 10%
random_range = int(base_expire * 0.1)
expire_time = base_expire + random.randint(0, random_range)
cache.set(key, value, timeout=expire_time)
# 使用
set_cache_with_random_expire('product:123', product_data, 3600)
# 實際過期時間:3600-3960 秒(1-1.1 小時)方案 2:互斥鎖(和擊穿方案一樣)⭐⭐⭐⭐
import time
import uuid
from django_redis import get_redis_connection
def get_product(product_id):
cache_key = f'product:{product_id}'
# 查緩存
product = cache.get(cache_key)
if product:
return product
# 嘗試獲取鎖
redis_conn = get_redis_connection('default')
lock_key = f'lock:product:{product_id}'
lock_value = str(uuid.uuid4())
if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
try:
# 雙重檢查
product = cache.get(cache_key)
if product:
return product
# 查詢數據庫
product = Product.objects.get(id=product_id)
# 寫入緩存(加隨機值)
expire = 3600 + random.randint(0, 300)
cache.set(cache_key, product, timeout=expire)
return product
finally:
# 安全釋放鎖
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
redis_conn.eval(lua_script, 1, lock_key, lock_value)
else:
# 沒拿到鎖,等待後重試
time.sleep(0.05)
return get_product(product_id)效果:
即使 10000 個 key 同時過期:
Product 1: Request A 拿到鎖 → 查 DB → 寫緩存
Request B, C, D... 等待
Product 2: Request E 拿到鎖 → 查 DB → 寫緩存
Request F, G, H... 等待
...
結果:同時只有 N 個請求查數據庫(N = key 的數量)
而不是 M 個請求(M = 用戶請求總數)方案 3:多級緩存 ⭐⭐⭐
概念: 本地緩存 + Redis,即使 Redis 掛了也有本地緩存頂著
import time
# 本地緩存(進程內內存)
local_cache = {}
def get_product(product_id):
# 第 1 層:本地緩存(最快)
cache_key = f'product:{product_id}'
if cache_key in local_cache:
data, expire_time = local_cache[cache_key]
if time.time() < expire_time:
return data
# 第 2 層:Redis 緩存
product = cache.get(cache_key)
if product:
# 寫入本地緩存(30 秒)
local_cache[cache_key] = (product, time.time() + 30)
return product
# 第 3 層:數據庫
product = Product.objects.get(id=product_id)
# 寫入 Redis
expire = 3600 + random.randint(0, 300)
cache.set(cache_key, product, timeout=expire)
# 寫入本地緩存
local_cache[cache_key] = (product, time.time() + 30)
return product效果:
Redis 雪崩時:
Request 1: 本地緩存 miss → Redis miss → 查 DB
Request 2: 本地緩存 hit ✓(30 秒內)
Request 3: 本地緩存 hit ✓
Request 4: 本地緩存 hit ✓
...
結果:即使 Redis 失效,本地緩存還能撐 30 秒架構圖:
用戶請求
↓
┌─────────────────┐
│ 本地緩存(30秒) │ ← 第 1 層防護
└─────────────────┘
↓ miss
┌─────────────────┐
│ Redis(1 小時) │ ← 第 2 層防護
└─────────────────┘
↓ miss
┌─────────────────┐
│ 數據庫 │ ← 最後防線
└─────────────────┘方案 4:限流降級 ⭐⭐⭐⭐
概念: 當數據庫壓力過大時,限制請求或降級服務
from threading import Semaphore
# 限制同時訪問數據庫的請求數
db_semaphore = Semaphore(100) # 最多 100 個並發
def get_product(product_id):
cache_key = f'product:{product_id}'
# 查 Redis
product = cache.get(cache_key)
if product:
return product
# Redis miss,需要查 DB
if not db_semaphore.acquire(blocking=False):
# 無法獲取信號量,數據庫壓力太大
# 降級:返回默認值或提示
return {
'id': product_id,
'name': '商品載入中...',
'price': 0,
'available': False
}
try:
product = Product.objects.get(id=product_id)
expire = 3600 + random.randint(0, 300)
cache.set(cache_key, product, timeout=expire)
return product
finally:
db_semaphore.release()進階:熔斷器模式
class CircuitBreaker:
def __init__(self, threshold=10, timeout=60):
self.failure_count = 0
self.threshold = threshold # 失敗次數閾值
self.timeout = timeout
self.last_failure_time = 0
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
# 熔斷器打開,檢查是否可以嘗試恢復
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
# 成功,重置計數器
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
# 達到閾值,打開熔斷器
if self.failure_count >= self.threshold:
self.state = 'OPEN'
raise e
# 使用
db_breaker = CircuitBreaker(threshold=10, timeout=60)
def get_product_from_db(product_id):
try:
return db_breaker.call(
Product.objects.get,
id=product_id
)
except:
# 熔斷器打開,返回降級數據
return get_fallback_data(product_id)方案 5:緩存預熱 ⭐⭐⭐
概念: 在系統啟動或低峰期提前載入緩存
def warmup_cache():
"""系統啟動時預熱緩存"""
print("開始預熱緩存...")
# 獲取熱門商品
hot_products = Product.objects.filter(is_hot=True)
for i, product in enumerate(hot_products):
# ✅ 分批載入,避免同時過期
batch = i // 100 # 每 100 個一批
delay = batch * 60 # 每批間隔 1 分鐘
base_expire = 3600 + delay
random_expire = base_expire + random.randint(0, 300)
cache.set(
f'product:{product.id}',
product,
timeout=random_expire
)
print("緩存預熱完成!")
# 在系統啟動時執行
warmup_cache()定時預熱:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
@app.task
def refresh_hot_products():
"""每小時刷新熱門商品緩存(錯開時間)"""
hot_products = get_hot_products()
for i, product_id in enumerate(hot_products):
# 錯開刷新時間
delay = i * 2 # 每個商品間隔 2 秒
refresh_product_cache.apply_async(
args=[product_id],
countdown=delay
)
@app.task
def refresh_product_cache(product_id):
product = Product.objects.get(id=product_id)
expire = 3600 + random.randint(0, 300)
cache.set(f'product:{product_id}', product, timeout=expire)
# 設定定時任務
app.conf.beat_schedule = {
'refresh-hot-products': {
'task': 'tasks.refresh_hot_products',
'schedule': 3600.0, # 每小時執行
},
}方案 6:Redis 持久化 + 高可用 ⭐⭐⭐⭐
概念: 即使 Redis 重啟,數據也不會全部丟失
redis.conf 配置:
# 1. RDB 持久化(快照)
save 900 1 # 900 秒內至少 1 個 key 變化就儲存
save 300 10 # 300 秒內至少 10 個 key 變化就儲存
save 60 10000 # 60 秒內至少 10000 個 key 變化就儲存
# 2. AOF 持久化(日誌)
appendonly yes
appendfsync everysec # 每秒同步一次
# 3. 主從複製
replicaof <master-ip> <master-port>
# 4. Sentinel 高可用
sentinel monitor mymaster 127.0.0.1 6379 2效果:
Redis 重啟:
├─ 有 RDB/AOF → 數據恢復 ✓
├─ 部分緩存失效(最近的)
└─ 不會全部雪崩
Redis 主節點掛了:
├─ Sentinel 自動切換到從節點
├─ 服務幾乎不中斷
└─ 避免雪崩📊 三大問題對比總結
| 問題 | 原因 | 特徵 | 最佳方案 |
|---|---|---|---|
| 穿透 (Penetration) | 查詢不存在的數據 | 持續打數據庫 | 布隆過濾器 (Bloom Filter) + 緩存空值 |
| 擊穿 (Breakdown) | 熱點數據過期 | 瞬間高並發 | 互斥鎖 (Mutex Lock) + 邏輯過期 (Logical Expiration) |
| 雪崩 (Avalanche) | 大量緩存同時過期 | 數據庫崩潰 | 隨機過期 + 多級緩存 (Multi-level Cache) + 熔斷 (Circuit Breaker) |
💡 完整解決方案
from django.core.cache import cache
from django_redis import get_redis_connection
import random
import time
import uuid
class CacheManager:
"""統一緩存管理器"""
def __init__(self, bloom_filter=None):
self.bloom_filter = bloom_filter
self.redis_conn = get_redis_connection('default')
def get_with_protection(self, key, rebuild_func, timeout=300):
"""
獲取緩存(防穿透、擊穿、雪崩)
Args:
key: 緩存鍵
rebuild_func: 重建數據的函數
timeout: 基礎超時時間
"""
# 1. 防穿透:檢查布隆過濾器
if self.bloom_filter and not self.bloom_filter.exists(key):
return None
# 2. 查緩存
value = cache.get(key)
if value is not None:
if value == 'NULL':
return None
return value
# 3. 防擊穿:分布式鎖
lock_key = f'lock:{key}'
lock_value = str(uuid.uuid4())
if self.redis_conn.set(lock_key, lock_value, nx=True, ex=10):
try:
# 雙重檢查
value = cache.get(key)
if value is not None:
return value if value != 'NULL' else None
# 重建數據
try:
value = rebuild_func()
if value is None:
cache.set(key, 'NULL', timeout=60)
return None
# 防雪崩:隨機過期時間
random_timeout = timeout + random.randint(-timeout // 5, timeout // 5)
cache.set(key, value, timeout=random_timeout)
return value
except Exception as e:
# 記錄錯誤
print(f"Rebuild failed: {e}")
return None
finally:
# 安全釋放鎖
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis_conn.eval(lua_script, 1, lock_key, lock_value)
else:
# 等待並重試
time.sleep(0.05)
return cache.get(key)
# 使用
cache_manager = CacheManager(bloom_filter=bloom_filter)
def get_product(product_id):
return cache_manager.get_with_protection(
key=f'product:{product_id}',
rebuild_func=lambda: Product.objects.get(id=product_id),
timeout=300
)💡 面試要點
Q1: 緩存穿透(Cache Penetration)和緩存擊穿(Cache Breakdown)有什麼區別?
答:
- 穿透 (Penetration):查詢不存在的數據,持續打數據庫
- 擊穿 (Breakdown):熱點數據過期,瞬間高並發打數據庫
Q2: 布隆過濾器(Bloom Filter)為什麼能防穿透?
答:
- 布隆過濾器可以快速判斷數據一定不存在
- 不存在的直接返回,不查數據庫
- 可能存在的才查詢(允許小概率誤判)
Q3: 互斥鎖(Mutex Lock)會降低性能嗎?
答:
- 會有輕微影響,但只在緩存失效瞬間
- 相比數據庫崩潰,這點性能損失可接受
- 可以用「邏輯過期(Logical Expiration)」優化,返回舊數據
Q4: 如何防止緩存雪崩(Cache Avalanche)?
答:
- 隨機過期時間(Random Expiration Time):分散過期
- 緩存預熱(Cache Warming):提前加載
- 多級緩存(Multi-level Cache):本地 + Redis
- 熔斷降級(Circuit Breaker):保護數據庫
🔗 下一篇
在下一篇文章中,我們將學習 緩存更新策略,探討如何保證緩存與數據庫的一致性。
閱讀時間:10 分鐘