Django 面試準備 11-3:競態條件處理
深入理解並發環境下的競態條件問題與解決方案
目錄
11-3. 競態條件處理(Race Condition Handling)
📌 什麼是競態條件?
簡單說: 多個線程同時操作共享數據,結果取決於執行順序
定義: 當兩個或多個線程並發訪問共享資源,且至少有一個線程進行寫操作時,如果沒有適當的同步機制,程序的行為將變得不可預測。
🔍 經典的競態條件示例
銀行轉帳問題
# ❌ 有競態條件的代碼
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
# 步驟 1: 檢查餘額
if self.balance >= amount:
# 步驟 2: 計算新餘額
new_balance = self.balance - amount
# 步驟 3: 更新餘額
self.balance = new_balance
return True
return False
account = BankAccount(balance=100)競態條件發生:
時間 Thread 1 (提款 80) Thread 2 (提款 80) balance
T1 檢查: 100 >= 80 ✓
T2 檢查: 100 >= 80 ✓ 100
T3 計算: 100 - 80 = 20
T4 計算: 100 - 80 = 20 100
T5 寫入: balance = 20
T6 寫入: balance = 20 20
結果:兩次提款共 160 元,但餘額只減少 80 元!💥
正確應該是:餘額不足,第二次提款失敗🎯 Django 中的常見競態條件
場景 1:庫存扣減
# models.py
class Product(models.Model):
name = models.CharField(max_length=200)
stock = models.IntegerField(default=0)
# ❌ 有競態條件的代碼
def purchase(request, product_id, quantity):
product = Product.objects.get(id=product_id)
# 檢查庫存
if product.stock >= quantity:
# 扣減庫存
product.stock -= quantity
product.save()
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})問題:
初始庫存:10
時間 Thread 1 (購買 6) Thread 2 (購買 8) stock
T1 讀取: stock = 10
T2 讀取: stock = 10 10
T3 檢查: 10 >= 6 ✓
T4 檢查: 10 >= 8 ✓ 10
T5 計算: 10 - 6 = 4
T6 計算: 10 - 8 = 2 10
T7 保存: stock = 4
T8 保存: stock = 2 2
結果:賣出 14 件商品,但庫存只有 10 件!超賣了!💥場景 2:點讚計數
# models.py
class Post(models.Model):
title = models.CharField(max_length=200)
likes = models.IntegerField(default=0)
# ❌ 有競態條件的代碼
def like_post(request, post_id):
post = Post.objects.get(id=post_id)
# 增加讚數
post.likes += 1
post.save()
return JsonResponse({'likes': post.likes})問題:
100 個用戶同時點讚:
理想情況:likes: 0 → 100
實際情況:likes: 0 → 67(丟失了 33 個讚)💥
原因:likes += 1 不是原子操作場景 3:唯一性檢查
# models.py
class User(models.Model):
username = models.CharField(max_length=50) # 沒有 unique=True
# ❌ 有競態條件的代碼
def register(request):
username = request.POST.get('username')
# 檢查用戶名是否存在
if User.objects.filter(username=username).exists():
return JsonResponse({'error': 'Username exists'})
# 創建用戶
user = User.objects.create(username=username)
return JsonResponse({'user_id': user.id})問題:
時間 Thread 1 Thread 2 數據庫
T1 檢查 "alice" 存在?
→ No
T2 檢查 "alice" 存在?
→ No
T3 創建 User("alice") ✓
T4 創建 User("alice") ✓
結果:創建了兩個相同用戶名的用戶!💥✅ 解決方案 1:數據庫級原子操作
F() 表達式
from django.db.models import F
# ✅ 使用 F() 表達式(原子操作)
def purchase_v1(request, product_id, quantity):
# 一條 SQL 完成:UPDATE ... SET stock = stock - ? WHERE id = ? AND stock >= ?
affected = Product.objects.filter(
id=product_id,
stock__gte=quantity
).update(stock=F('stock') - quantity)
if affected > 0:
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})生成的 SQL:
UPDATE product
SET stock = stock - 6
WHERE id = 123 AND stock >= 6;為什麼安全? 這是單條原子 SQL,數據庫保證其完整執行
點讚計數優化
# ✅ 使用 F() 表達式
def like_post(request, post_id):
# 原子操作
Post.objects.filter(id=post_id).update(likes=F('likes') + 1)
# 獲取更新後的值
post = Post.objects.get(id=post_id)
return JsonResponse({'likes': post.likes})✅ 解決方案 2:數據庫事務 + 鎖
SELECT FOR UPDATE(悲觀鎖)
from django.db import transaction
# ✅ 使用 select_for_update(悲觀鎖)
@transaction.atomic
def purchase_v2(request, product_id, quantity):
# 鎖定這一行,其他事務必須等待
product = Product.objects.select_for_update().get(id=product_id)
if product.stock >= quantity:
product.stock -= quantity
product.save()
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})執行流程:
Thread 1 Thread 2
開始事務
SELECT ... FOR UPDATE
(鎖定 product)
開始事務
SELECT ... FOR UPDATE
(等待 Thread 1 釋放鎖)⏰
檢查庫存: 10 >= 6 ✓
扣減庫存: 10 - 6 = 4
保存
提交事務(釋放鎖)✅
(獲得鎖)✓
檢查庫存: 4 >= 8 ❌
返回庫存不足
結果:正確處理!✅SQL:
-- PostgreSQL / MySQL
BEGIN;
SELECT * FROM product WHERE id = 123 FOR UPDATE;
UPDATE product SET stock = 4 WHERE id = 123;
COMMIT;nowait 參數
@transaction.atomic
def purchase_v3(request, product_id, quantity):
try:
# nowait=True:如果無法獲取鎖,立即拋出異常
product = Product.objects.select_for_update(nowait=True).get(id=product_id)
if product.stock >= quantity:
product.stock -= quantity
product.save()
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})
except DatabaseError:
# 無法獲取鎖,說明其他線程正在處理
return JsonResponse({'status': 'busy', 'message': 'Try again later'})✅ 解決方案 3:樂觀鎖(版本號)
基本概念
悲觀鎖: 假設一定會衝突,提前加鎖 樂觀鎖: 假設不會衝突,更新時檢查版本
# models.py
class Product(models.Model):
name = models.CharField(max_length=200)
stock = models.IntegerField(default=0)
version = models.IntegerField(default=0) # 版本號
# ✅ 使用樂觀鎖
def purchase_v4(request, product_id, quantity):
max_retries = 3
for attempt in range(max_retries):
product = Product.objects.get(id=product_id)
if product.stock < quantity:
return JsonResponse({'status': 'insufficient_stock'})
old_version = product.version
# 嘗試更新(檢查版本號)
affected = Product.objects.filter(
id=product_id,
version=old_version,
stock__gte=quantity
).update(
stock=F('stock') - quantity,
version=F('version') + 1
)
if affected > 0:
# 更新成功
return JsonResponse({'status': 'success'})
# 版本不匹配,重試
continue
return JsonResponse({'status': 'retry_exceeded'})執行流程:
Thread 1 Thread 2
讀取: stock=10, version=1
讀取: stock=10, version=1
更新 WHERE version=1 ✓
version -> 2
更新 WHERE version=1 ❌
(version 已經是 2)
重試...
讀取: stock=4, version=2
檢查庫存不足 ✓✅ 解決方案 4:分布式鎖(Redis)
適用場景
- 多個服務器實例
- 需要跨進程同步
- 高並發場景
from django.core.cache import cache
import time
# ✅ 使用 Redis 分布式鎖
def purchase_v5(request, product_id, quantity):
lock_key = f'lock:product:{product_id}'
# 嘗試獲取鎖(10 秒過期)
acquired = cache.add(lock_key, 'locked', timeout=10)
if not acquired:
return JsonResponse({
'status': 'busy',
'message': 'Another user is purchasing, please wait'
})
try:
product = Product.objects.get(id=product_id)
if product.stock >= quantity:
product.stock -= quantity
product.save()
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})
finally:
# 釋放鎖
cache.delete(lock_key)更安全的實現(防止誤刪)
import uuid
from django_redis import get_redis_connection
def purchase_v6(request, product_id, quantity):
redis_conn = get_redis_connection('default')
lock_key = f'lock:product:{product_id}'
lock_value = str(uuid.uuid4()) # 唯一標識
# 嘗試獲取鎖
if redis_conn.set(lock_key, lock_value, nx=True, ex=10):
try:
product = Product.objects.get(id=product_id)
if product.stock >= quantity:
product.stock -= quantity
product.save()
return JsonResponse({'status': 'success'})
return JsonResponse({'status': 'insufficient_stock'})
finally:
# 安全釋放鎖(只刪除自己的鎖)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
redis_conn.eval(lua_script, 1, lock_key, lock_value)
return JsonResponse({'status': 'busy'})✅ 解決方案 5:唯一約束
數據庫級約束
# models.py
class User(models.Model):
# ✅ 使用 unique 約束
username = models.CharField(max_length=50, unique=True)
def register(request):
username = request.POST.get('username')
try:
# 即使並發,數據庫也會保證唯一性
user = User.objects.create(username=username)
return JsonResponse({'user_id': user.id})
except IntegrityError:
# 用戶名已存在
return JsonResponse({'error': 'Username exists'})複合唯一約束
# models.py
class Vote(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
post = models.ForeignKey(Post, on_delete=models.CASCADE)
vote_type = models.CharField(max_length=10) # up/down
class Meta:
# ✅ 複合唯一約束:一個用戶對一篇文章只能投一票
unique_together = ['user', 'post']
def vote(request, post_id):
try:
# 並發情況下,只有一個會成功
vote = Vote.objects.create(
user=request.user,
post_id=post_id,
vote_type='up'
)
return JsonResponse({'status': 'success'})
except IntegrityError:
# 已經投過票
return JsonResponse({'error': 'Already voted'})📊 方案對比
| 方案 | 性能 | 複雜度 | 適用場景 | 數據庫支持 |
|---|---|---|---|---|
| F() 表達式 | ⭐⭐⭐⭐⭐ | ⭐ | 簡單字段更新 | 所有 |
| SELECT FOR UPDATE | ⭐⭐⭐ | ⭐⭐ | 複雜業務邏輯 | PostgreSQL, MySQL |
| 樂觀鎖 | ⭐⭐⭐⭐ | ⭐⭐⭐ | 衝突較少 | 所有 |
| 分布式鎖 | ⭐⭐⭐ | ⭐⭐⭐⭐ | 跨服務器 | Redis |
| 唯一約束 | ⭐⭐⭐⭐⭐ | ⭐ | 唯一性檢查 | 所有 |
🎯 實戰案例
案例 1:秒殺系統
from django.db import transaction
from django.db.models import F
def flash_sale(request, product_id):
"""秒殺搶購"""
# 方案 1:F() 表達式(推薦)
affected = Product.objects.filter(
id=product_id,
stock__gt=0
).update(stock=F('stock') - 1)
if affected == 0:
return JsonResponse({'status': 'sold_out'})
# 創建訂單
order = Order.objects.create(
user=request.user,
product_id=product_id,
quantity=1
)
return JsonResponse({'status': 'success', 'order_id': order.id})案例 2:座位預訂
from django.db import transaction
# models.py
class Seat(models.Model):
row = models.IntegerField()
number = models.IntegerField()
is_booked = models.BooleanField(default=False)
booked_by = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
version = models.IntegerField(default=0)
class Meta:
unique_together = ['row', 'number']
# views.py
@transaction.atomic
def book_seat(request, seat_id):
"""預訂座位(樂觀鎖)"""
max_retries = 3
for attempt in range(max_retries):
seat = Seat.objects.get(id=seat_id)
if seat.is_booked:
return JsonResponse({'error': 'Seat already booked'})
old_version = seat.version
# 嘗試預訂
affected = Seat.objects.filter(
id=seat_id,
version=old_version,
is_booked=False
).update(
is_booked=True,
booked_by=request.user,
version=F('version') + 1
)
if affected > 0:
return JsonResponse({'status': 'success'})
# 重試
time.sleep(0.01)
return JsonResponse({'error': 'Booking failed, please retry'})案例 3:帳戶餘額轉帳
from django.db import transaction
@transaction.atomic
def transfer_money(from_user_id, to_user_id, amount):
"""轉帳(使用悲觀鎖)"""
# 鎖定兩個帳戶(按 ID 順序,防止死鎖)
user_ids = sorted([from_user_id, to_user_id])
users = User.objects.select_for_update().filter(
id__in=user_ids
).order_by('id')
from_user = users.get(id=from_user_id)
to_user = users.get(id=to_user_id)
# 檢查餘額
if from_user.balance < amount:
raise ValueError("Insufficient balance")
# 轉帳
from_user.balance -= amount
to_user.balance += amount
from_user.save()
to_user.save()
return True🔍 如何檢測競態條件?
1. 並發測試
import threading
from django.test import TestCase
class RaceConditionTest(TestCase):
def test_concurrent_purchase(self):
"""測試並發購買"""
# 創建商品,庫存 10
product = Product.objects.create(name='Test', stock=10)
results = []
def purchase():
# 嘗試購買 6 件
response = self.client.post(f'/purchase/{product.id}/', {'quantity': 6})
results.append(response.json()['status'])
# 啟動 3 個線程同時購買
threads = [threading.Thread(target=purchase) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# 檢查結果
successes = results.count('success')
self.assertEqual(successes, 1, "應該只有一個購買成功")
# 檢查庫存
product.refresh_from_db()
self.assertEqual(product.stock, 4, "庫存應該是 4")2. 壓力測試
# 使用 locust 測試
pip install locust
# locustfile.py
from locust import HttpUser, task
class StressTest(HttpUser):
@task
def purchase(self):
self.client.post('/purchase/123/', json={'quantity': 1})
# 運行
locust -f locustfile.py --users 100 --spawn-rate 103. 代碼審查
# 檢查這些模式:
# ❌ 讀-改-寫操作
obj = Model.objects.get(id=1)
obj.field += 1
obj.save()
# ❌ 檢查-然後-操作
if Model.objects.filter(...).exists():
Model.objects.create(...)
# ❌ 沒有事務保護的多步操作
user.balance -= 100
user.save()
merchant.balance += 100
merchant.save()💡 最佳實踐
1. 優先使用數據庫原子操作
# ✅ 優先級 1:F() 表達式
Product.objects.filter(id=1).update(stock=F('stock') - 1)
# ✅ 優先級 2:唯一約束
User.objects.create(username='alice') # unique=True
# ✅ 優先級 3:SELECT FOR UPDATE
with transaction.atomic():
obj = Model.objects.select_for_update().get(id=1)
obj.field += 1
obj.save()2. 避免長時間持有鎖
# ❌ 不好:鎖持有太久
@transaction.atomic
def process(request):
obj = Model.objects.select_for_update().get(id=1)
# 執行耗時操作
time.sleep(5) # 糟糕!
obj.save()
# ✅ 好:縮小鎖範圍
def process(request):
# 耗時操作放在外面
result = expensive_computation()
# 只在必要時加鎖
with transaction.atomic():
obj = Model.objects.select_for_update().get(id=1)
obj.field = result
obj.save()3. 注意死鎖
# ❌ 可能死鎖
def func1():
with transaction.atomic():
a = A.objects.select_for_update().get(id=1)
b = B.objects.select_for_update().get(id=2)
def func2():
with transaction.atomic():
b = B.objects.select_for_update().get(id=2) # 順序相反!
a = A.objects.select_for_update().get(id=1)
# ✅ 避免死鎖:固定鎖順序
def lock_in_order(*ids):
return Model.objects.select_for_update().filter(
id__in=ids
).order_by('id')💡 面試要點
Q1: 什麼是競態條件?如何避免?
答:
- 多個線程並發訪問共享資源,結果不可預測
- 避免方法:原子操作、鎖、事務、版本控制
Q2: 樂觀鎖和悲觀鎖的區別?
答:
- 悲觀鎖:SELECT FOR UPDATE,假設一定衝突,提前加鎖
- 樂觀鎖:版本號檢查,假設不衝突,更新時檢測
- 悲觀鎖性能低但安全,樂觀鎖性能高但需重試
Q3: F() 表達式為什麼是原子的?
答:
- 生成單條 SQL:
UPDATE ... SET field = field + 1 - 在數據庫層執行,不經過 Python
- 數據庫保證 UPDATE 的原子性
Q4: 如何選擇並發控制方案?
答:
- 簡單字段更新 → F() 表達式
- 複雜業務邏輯 → SELECT FOR UPDATE
- 衝突較少 → 樂觀鎖
- 跨服務器 → 分布式鎖
🔗 下一篇
在下一篇文章中,我們將深入學習 threading.local 原理,了解 Django 如何使用線程本地存儲實現請求隔離。
閱讀時間:10 分鐘