Django 面試準備 09-4:緩存更新策略
深入理解緩存與數據庫一致性的各種更新策略
目錄
09-4. 緩存更新策略
📌 核心問題:緩存一致性
緩存與數據庫如何保持一致?
緩存:price = 100
數據庫:price = 100
↓ 用戶修改價格為 150
緩存:price = 100 ⚠️ 舊數據
數據庫:price = 150 ✅ 新數據不一致的後果:
- 用戶看到錯誤的數據
- 訂單金額計算錯誤
- 庫存顯示不準確
🎯 四大更新策略概覽
| 策略 | 誰負責更新 | 流程 | 適用場景 |
|---|---|---|---|
| Cache Aside | 應用程序 | 先更新 DB,再刪除緩存 | ✅ 最常用 |
| Read Through | 緩存層 | 緩存自動從 DB 加載 | 讀多寫少 |
| Write Through | 緩存層 | 緩存自動更新 DB | 強一致性 |
| Write Behind | 緩存層 | 異步更新 DB | 高性能要求 |
🎯 策略 1:Cache Aside(旁路緩存)
最常用的策略
讀流程:
1. 查緩存 → 命中 → 返回
2. 查緩存 → 未命中 → 查數據庫 → 寫入緩存 → 返回寫流程:
1. 更新數據庫
2. 刪除緩存(不是更新!)Django 實現
from django.core.cache import cache
# === 讀操作 ===
def get_product(product_id):
"""讀取商品"""
cache_key = f'product:{product_id}'
# 1. 查緩存
product = cache.get(cache_key)
if product is not None:
return product
# 2. 查數據庫
try:
product = Product.objects.get(id=product_id)
# 3. 寫入緩存
cache.set(cache_key, product, timeout=300)
return product
except Product.DoesNotExist:
return None
# === 寫操作 ===
def update_product(product_id, new_price):
"""更新商品價格"""
cache_key = f'product:{product_id}'
# 1. 更新數據庫
Product.objects.filter(id=product_id).update(price=new_price)
# 2. 刪除緩存(而不是更新緩存!)
cache.delete(cache_key)⚠️ 為什麼刪除而不是更新?
原因 1:避免併發問題
時刻 線程 A 線程 B
T1 更新 DB: price=100
T2 更新 DB: price=200
T3 更新緩存: price=200
T4 更新緩存: price=100 ⚠️ 緩存變成舊值!原因 2:減少無效更新
# 如果數據很少被讀取,更新緩存就是浪費
Product.objects.filter(id=999).update(stock=50)
cache.set('product:999', ...) # 可能沒人會讀,浪費資源
# 刪除緩存,等真正需要時再加載
cache.delete('product:999') # 更高效🔍 完整範例
from django.core.cache import cache
from django.db import transaction
class ProductService:
@staticmethod
def get(product_id):
"""獲取商品"""
cache_key = f'product:{product_id}'
product = cache.get(cache_key)
if product is None:
product = Product.objects.filter(id=product_id).first()
if product:
cache.set(cache_key, product, timeout=300)
return product
@staticmethod
@transaction.atomic
def update_price(product_id, new_price):
"""更新價格"""
# 1. 更新數據庫
updated = Product.objects.filter(
id=product_id
).update(price=new_price)
if updated:
# 2. 刪除緩存
cache_key = f'product:{product_id}'
cache.delete(cache_key)
# 3. 刪除相關緩存
cache.delete(f'product:list:all')
return True
return False🎯 策略 2:Read Through(讀穿透)
特點
緩存層負責從數據庫加載數據,應用程序只與緩存交互。
流程
應用 → 緩存層 → 緩存命中?→ 返回
↓ 未命中
數據庫 → 寫入緩存 → 返回Django 實現
class CacheLoader:
"""緩存加載器"""
@staticmethod
def get_product(product_id):
"""自動加載的緩存"""
cache_key = f'product:{product_id}'
def load_from_db():
"""從數據庫加載"""
return Product.objects.filter(id=product_id).first()
# 獲取或加載
return cache.get_or_set(
cache_key,
load_from_db,
timeout=300
)
# 使用
product = CacheLoader.get_product(123) # 不關心數據來源進階:自動刷新
import time
from threading import Thread
from django.core.cache import cache
class AutoRefreshCache:
"""自動刷新緩存"""
def __init__(self, key, loader_func, timeout=300, refresh_interval=60):
self.key = key
self.loader_func = loader_func
self.timeout = timeout
self.refresh_interval = refresh_interval
def get(self):
"""獲取數據"""
data = cache.get(self.key)
if data is None:
# 首次加載
data = self._load_and_cache()
# 啟動後台刷新
self._start_refresh_thread()
return data
def _load_and_cache(self):
"""加載並緩存"""
data = self.loader_func()
cache.set(self.key, data, timeout=self.timeout)
return data
def _start_refresh_thread(self):
"""啟動後台刷新線程"""
def refresh():
while True:
time.sleep(self.refresh_interval)
self._load_and_cache()
thread = Thread(target=refresh, daemon=True)
thread.start()
# 使用
def load_hot_products():
return Product.objects.filter(is_hot=True)[:10]
hot_products_cache = AutoRefreshCache(
key='hot_products',
loader_func=load_hot_products,
timeout=300,
refresh_interval=60
)
products = hot_products_cache.get()🎯 策略 3:Write Through(寫穿透)
特點
寫操作同時更新緩存和數據庫。
流程
應用 → 緩存層 → 同時 → 更新緩存
↓
更新數據庫Django 實現
from django.db import transaction
from django.core.cache import cache
class WriteThoughCache:
"""寫穿透緩存"""
@staticmethod
def update_product(product_id, **updates):
"""同時更新緩存和數據庫"""
cache_key = f'product:{product_id}'
with transaction.atomic():
# 1. 更新數據庫
Product.objects.filter(id=product_id).update(**updates)
# 2. 獲取最新數據
product = Product.objects.get(id=product_id)
# 3. 更新緩存
cache.set(cache_key, product, timeout=300)
return product
# 使用
product = WriteThoughCache.update_product(123, price=150, stock=100)⚖️ 優缺點
優點:
- ✅ 緩存始終是最新的
- ✅ 讀取永遠命中
缺點:
- ⚠️ 寫入變慢(雙寫)
- ⚠️ 浪費資源(不常讀的數據也寫緩存)
🎯 策略 4:Write Behind(寫回)
特點
先更新緩存,異步批量更新數據庫。
流程
應用 → 緩存 → 立即返回
↓
(異步)
↓
數據庫Django + Celery 實現
from django.core.cache import cache
from celery import shared_task
# === 異步任務 ===
@shared_task
def sync_product_to_db(product_id, updates):
"""異步同步到數據庫"""
Product.objects.filter(id=product_id).update(**updates)
# === 寫入操作 ===
def update_product_write_behind(product_id, **updates):
"""寫回策略"""
cache_key = f'product:{product_id}'
# 1. 更新緩存(快速返回)
product = cache.get(cache_key)
if product is None:
product = Product.objects.get(id=product_id)
# 更新內存中的對象
for key, value in updates.items():
setattr(product, key, value)
cache.set(cache_key, product, timeout=300)
# 2. 異步更新數據庫
sync_product_to_db.delay(product_id, updates)
return product進階:批量合併更新
from collections import defaultdict
from celery import shared_task
import time
class WriteBehindBuffer:
"""寫回緩衝區"""
_buffer = defaultdict(dict)
@classmethod
def add(cls, model, pk, updates):
"""添加待更新數據"""
key = f"{model}:{pk}"
cls._buffer[key].update(updates)
@classmethod
def flush(cls):
"""批量刷新到數據庫"""
for key, updates in cls._buffer.items():
model, pk = key.split(':')
if model == 'Product':
Product.objects.filter(id=pk).update(**updates)
cls._buffer.clear()
# Celery 定時任務:每 5 秒批量寫入
@shared_task
def flush_write_behind_buffer():
WriteBehindBuffer.flush()
# 使用
def update_product(product_id, **updates):
# 更新緩存
cache_key = f'product:{product_id}'
product = cache.get(cache_key) or Product.objects.get(id=product_id)
for key, value in updates.items():
setattr(product, key, value)
cache.set(cache_key, product, timeout=300)
# 添加到緩衝區(異步批量寫入)
WriteBehindBuffer.add('Product', product_id, updates)⚠️ 風險
數據丟失風險:
- 緩存宕機 → 未同步的數據丟失
- 需要持久化隊列(Celery + RabbitMQ/Redis)
📊 策略選擇指南
決策樹
需要強一致性?
├─ 是 → Write Through(同步雙寫)
└─ 否 ↓
允許數據丟失風險?
├─ 否 → Cache Aside(最常用)
└─ 是 ↓
追求極致性能?
└─ 是 → Write Behind(異步寫入)
緩存層自動管理?
└─ 是 → Read Through(自動加載)對比表
| 策略 | 一致性 | 性能 | 複雜度 | 適用場景 |
|---|---|---|---|---|
| Cache Aside | 最終一致 | ⭐⭐⭐⭐ | ⭐⭐ | 通用場景 |
| Read Through | 最終一致 | ⭐⭐⭐⭐ | ⭐⭐⭐ | 讀多寫少 |
| Write Through | 強一致 | ⭐⭐⭐ | ⭐⭐⭐ | 金融系統 |
| Write Behind | 弱一致 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 日誌、統計 |
🎯 實戰案例
案例 1:電商商品緩存
from django.core.cache import cache
from django.db import transaction
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
class ProductCache:
"""商品緩存管理"""
TIMEOUT = 300
@staticmethod
def get_cache_key(product_id):
return f'product:{product_id}'
@classmethod
def get(cls, product_id):
"""獲取商品(Cache Aside)"""
cache_key = cls.get_cache_key(product_id)
product = cache.get(cache_key)
if product is None:
product = Product.objects.select_related('category').filter(
id=product_id
).first()
if product:
cache.set(cache_key, product, timeout=cls.TIMEOUT)
return product
@classmethod
def invalidate(cls, product_id):
"""失效緩存"""
cache_key = cls.get_cache_key(product_id)
cache.delete(cache_key)
# 同時失效列表緩存
cache.delete('product:list:all')
cache.delete('product:list:hot')
# 信號處理:自動失效緩存
@receiver([post_save, post_delete], sender=Product)
def invalidate_product_cache(sender, instance, **kwargs):
ProductCache.invalidate(instance.id)
# 使用
product = ProductCache.get(123)案例 2:用戶會話緩存
from django.core.cache import cache
class SessionCache:
"""用戶會話緩存(Write Through)"""
@staticmethod
def get_session(session_id):
"""獲取會話"""
cache_key = f'session:{session_id}'
return cache.get(cache_key)
@staticmethod
def update_session(session_id, data, db_sync=True):
"""更新會話(同步到 DB)"""
cache_key = f'session:{session_id}'
# 1. 更新緩存
cache.set(cache_key, data, timeout=1800)
# 2. 同步更新數據庫(可選)
if db_sync:
Session.objects.filter(session_id=session_id).update(
data=data
)
# 使用
SessionCache.update_session('abc123', {'user_id': 456, 'cart': [...]})案例 3:統計數據緩存
from django.core.cache import cache
from celery import shared_task
class StatisticsCache:
"""統計數據緩存(Write Behind)"""
@staticmethod
def increment_view_count(product_id):
"""增加瀏覽量(異步寫入)"""
cache_key = f'stats:views:{product_id}'
# 1. 緩存中增加
new_count = cache.incr(cache_key, delta=1)
# 2. 每 100 次同步一次數據庫
if new_count % 100 == 0:
sync_view_count.delay(product_id, new_count)
return new_count
@shared_task
def sync_view_count(product_id, count):
"""異步同步瀏覽量"""
Product.objects.filter(id=product_id).update(view_count=count)
# 使用
StatisticsCache.increment_view_count(123) # 快速返回🔍 一致性問題深入
問題 1:先刪緩存還是先更新 DB?
方案 A:先刪緩存,再更新 DB
cache.delete(cache_key)
Product.objects.filter(id=product_id).update(price=150)問題:
時刻 線程 A 線程 B
T1 刪除緩存
T2 讀緩存 → 未命中
T3 讀 DB → price=100(舊值)
T4 寫緩存 → price=100
T5 更新 DB → price=150
結果:緩存是舊值 100,DB 是新值 150 ⚠️方案 B:先更新 DB,再刪緩存(推薦)
Product.objects.filter(id=product_id).update(price=150)
cache.delete(cache_key)更安全,但仍有極小概率不一致。
問題 2:延遲雙刪
from django.db import transaction
def update_product_with_double_delete(product_id, new_price):
"""延遲雙刪策略"""
cache_key = f'product:{product_id}'
with transaction.atomic():
# 1. 刪除緩存(第一次)
cache.delete(cache_key)
# 2. 更新數據庫
Product.objects.filter(id=product_id).update(price=new_price)
# 3. 延遲刪除緩存(第二次)
delete_cache_delayed.apply_async(
args=[cache_key],
countdown=1 # 延遲 1 秒
)
@shared_task
def delete_cache_delayed(cache_key):
"""延遲刪除緩存"""
cache.delete(cache_key)💡 最佳實踐
1. 設置合理的過期時間
# 不同數據不同策略
CACHE_TIMEOUTS = {
'user_profile': 1800, # 用戶資料:30 分鐘
'product_detail': 300, # 商品詳情:5 分鐘
'hot_products': 60, # 熱門商品:1 分鐘
'site_config': 86400, # 站點配置:1 天
}2. 監控緩存命中率
from django.core.cache import cache
def get_with_metrics(key, loader_func):
"""帶監控的緩存獲取"""
value = cache.get(key)
if value is not None:
# 命中
cache.incr('metrics:cache_hits')
return value
else:
# 未命中
cache.incr('metrics:cache_misses')
value = loader_func()
cache.set(key, value)
return value3. 使用版本控制
# 統一修改緩存版本,使舊緩存失效
CACHE_VERSION = 2
cache.set('product:123', product, version=CACHE_VERSION)
cache.get('product:123', version=CACHE_VERSION)💡 面試要點
Q1: Cache Aside 為什麼先更新 DB 再刪緩存?
答:
- 先刪緩存:高並發下可能寫入舊數據
- 先更新 DB:即使緩存刪除失敗,下次也會讀到新數據(過期後)
- 可用「延遲雙刪」進一步保證
Q2: Write Through 和 Write Behind 的區別?
答:
- Write Through:同步雙寫,強一致性,性能較低
- Write Behind:異步寫入,高性能,可能丟數據
Q3: 如何保證緩存與數據庫的強一致性?
答:
- 分布式鎖:寫操作加鎖
- Write Through:同步更新
- 訂閱 binlog:監聽 DB 變化自動刪緩存
- 降低緩存時間:減少不一致窗口
Q4: 緩存更新失敗怎麼辦?
答:
- 重試機制:Celery 重試
- 消息隊列:持久化更新任務
- 監控告警:及時發現問題
- 定時校驗:定期對比緩存與 DB
🎓 總結
選擇建議:
- 通用場景 → Cache Aside
- 讀多寫少 → Read Through
- 強一致性 → Write Through
- 高性能統計 → Write Behind
一致性保證:
- 先更新 DB,再刪緩存
- 使用延遲雙刪
- 設置合理的過期時間
- 監控 + 告警
恭喜!你已完成第 09 章「緩存策略實戰」的所有內容!
系列文章:
下一章: 10. SQL 查詢優化