Django 面試準備 08-3:分布式鎖實現

深入理解分布式環境下的鎖機制與實戰應用

08-3. 分布式鎖實現

📌 什麼是分布式鎖?

在分布式系統中,當多台服務器需要訪問同一個共享資源時,需要一種機制來協調它們的訪問順序,這就是分布式鎖

為什麼需要分布式鎖?

單機環境 vs 分布式環境

# ❌ 單機環境:Python 線程鎖可以工作
import threading

lock = threading.Lock()

def update_inventory():
    with lock:
        # 臨界區代碼
        inventory = get_inventory()
        inventory -= 1
        save_inventory(inventory)

問題: 當有多台 Django 服務器時,每台服務器有自己的內存空間,threading.Lock() 無法跨進程工作!

服務器 A (lock 1)  →  數據庫  ←  服務器 B (lock 2)
     ↓                                ↓
各自的鎖互不影響,仍然會出現競態條件!

🎯 分布式鎖的特性

一個好的分布式鎖應該具備:

1. 互斥性

在任意時刻,只有一個客戶端能持有鎖

2. 不會死鎖

即使持有鎖的客戶端崩潰,鎖最終也能被釋放

3. 容錯性

只要大部分節點正常運行,客戶端就能加鎖和解鎖

4. 加鎖和解鎖必須是同一個客戶端

不能把別人的鎖給解了


💻 實現方式對比

方案優點缺點適用場景
Redis性能高、實現簡單可能存在鎖丟失風險大部分場景
數據庫可靠性高、無需額外組件性能較差低並發場景
Zookeeper可靠性最高、支持阻塞鎖複雜度高、需額外維護對一致性要求極高的場景

🔴 方案 1:Redis 分布式鎖(推薦)

基礎版本:SETNX + EXPIRE

import redis
import time
import uuid

class RedisLock:
    """基礎版本的 Redis 分布式鎖"""

    def __init__(self, redis_client, lock_name, expire_time=10):
        """
        :param redis_client: Redis 客戶端
        :param lock_name: 鎖的名稱
        :param expire_time: 鎖的過期時間(秒)
        """
        self.redis_client = redis_client
        self.lock_name = f'lock:{lock_name}'
        self.expire_time = expire_time
        self.lock_value = str(uuid.uuid4())  # 唯一標識

    def acquire(self):
        """獲取鎖"""
        # SET key value NX EX seconds
        # NX: 只在鍵不存在時設置
        # EX: 設置過期時間(秒)
        result = self.redis_client.set(
            self.lock_name,
            self.lock_value,
            nx=True,
            ex=self.expire_time
        )
        return result is not None

    def release(self):
        """釋放鎖(使用 Lua 腳本確保原子性)"""
        lua_script = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        else
            return 0
        end
        """
        self.redis_client.eval(lua_script, 1, self.lock_name, self.lock_value)


# 使用示例
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
lock = RedisLock(redis_client, 'product:123')

if lock.acquire():
    try:
        # 執行臨界區代碼
        print('獲取鎖成功,執行業務邏輯')
    finally:
        lock.release()
else:
    print('獲取鎖失敗')

⚠️ 基礎版本的問題

問題 1:鎖過期時間設置多長?

# 如果設置 5 秒
lock = RedisLock(redis_client, 'product:123', expire_time=5)

# 但業務執行了 6 秒...
if lock.acquire():
    time.sleep(6)  # 模擬長時間業務
    # 此時鎖已經過期,別人可能已經獲取到鎖了!
    lock.release()

問題 2:如何自動續期?


進階版本:自動續期

import threading

class RedisLockWithRenewal:
    """帶自動續期的 Redis 分布式鎖"""

    def __init__(self, redis_client, lock_name, expire_time=10):
        self.redis_client = redis_client
        self.lock_name = f'lock:{lock_name}'
        self.expire_time = expire_time
        self.lock_value = str(uuid.uuid4())
        self._stop_renewal = threading.Event()
        self._renewal_thread = None

    def acquire(self):
        """獲取鎖"""
        result = self.redis_client.set(
            self.lock_name,
            self.lock_value,
            nx=True,
            ex=self.expire_time
        )

        if result:
            # 啟動續期線程
            self._start_renewal()
            return True

        return False

    def _start_renewal(self):
        """啟動自動續期線程"""
        def renew():
            while not self._stop_renewal.is_set():
                # 每隔 expire_time/3 續期一次
                time.sleep(self.expire_time / 3)

                # 檢查鎖是否還是自己的,如果是就續期
                lua_script = """
                if redis.call('GET', KEYS[1]) == ARGV[1] then
                    return redis.call('EXPIRE', KEYS[1], ARGV[2])
                else
                    return 0
                end
                """
                self.redis_client.eval(
                    lua_script,
                    1,
                    self.lock_name,
                    self.lock_value,
                    self.expire_time
                )

        self._renewal_thread = threading.Thread(target=renew, daemon=True)
        self._renewal_thread.start()

    def release(self):
        """釋放鎖"""
        # 停止續期線程
        self._stop_renewal.set()

        # 刪除鎖
        lua_script = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        else
            return 0
        end
        """
        self.redis_client.eval(lua_script, 1, self.lock_name, self.lock_value)

    def __enter__(self):
        """支持 with 語句"""
        if not self.acquire():
            raise Exception('Failed to acquire lock')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """支持 with 語句"""
        self.release()


# 使用示例(推薦)
with RedisLockWithRenewal(redis_client, 'product:123') as lock:
    # 執行業務邏輯,無需擔心鎖過期
    time.sleep(20)  # 即使執行很久也沒問題
    print('業務執行完成')

🏆 生產級方案:使用 redis-py-lock

實際項目中,推薦使用經過充分測試的庫:

pip install redis-lock
import redis
import redis_lock

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

# 使用 redis-lock 庫
lock = redis_lock.Lock(redis_client, 'product:123', expire=60, auto_renewal=True)

with lock:
    # 執行業務邏輯
    # 鎖會自動續期,無需擔心過期
    print('執行業務邏輯')

🟢 方案 2:數據庫分布式鎖

適合已經有數據庫、不想引入 Redis 的場景。

數據表設計

from django.db import models

class DistributedLock(models.Model):
    """數據庫分布式鎖"""
    lock_name = models.CharField(max_length=200, unique=True, primary_key=True)
    owner = models.CharField(max_length=100)  # 鎖的持有者
    acquired_at = models.DateTimeField(auto_now_add=True)
    expire_at = models.DateTimeField()  # 過期時間

    class Meta:
        db_table = 'distributed_lock'

實現

from django.db import transaction, IntegrityError
from django.utils import timezone
from datetime import timedelta
import uuid

class DatabaseLock:
    """基於數據庫的分布式鎖"""

    def __init__(self, lock_name, expire_seconds=60):
        self.lock_name = lock_name
        self.owner = str(uuid.uuid4())
        self.expire_seconds = expire_seconds

    def acquire(self, timeout=10):
        """
        嘗試獲取鎖
        :param timeout: 超時時間(秒)
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            try:
                with transaction.atomic():
                    # 1. 清理過期的鎖
                    DistributedLock.objects.filter(
                        lock_name=self.lock_name,
                        expire_at__lt=timezone.now()
                    ).delete()

                    # 2. 嘗試插入鎖記錄
                    DistributedLock.objects.create(
                        lock_name=self.lock_name,
                        owner=self.owner,
                        expire_at=timezone.now() + timedelta(seconds=self.expire_seconds)
                    )

                    return True

            except IntegrityError:
                # 鎖已被其他人持有
                time.sleep(0.1)

        return False

    def release(self):
        """釋放鎖"""
        DistributedLock.objects.filter(
            lock_name=self.lock_name,
            owner=self.owner
        ).delete()

    def __enter__(self):
        if not self.acquire():
            raise Exception('Failed to acquire lock')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


# 使用示例
with DatabaseLock('product:123') as lock:
    # 執行業務邏輯
    print('執行業務邏輯')

⚖️ 優缺點

優點:

  • 無需額外組件(使用現有數據庫)
  • 可靠性高

缺點:

  • 性能較差(需要 SQL 查詢)
  • 數據庫壓力大

🎯 Django 實戰案例

案例:防止重複支付

from django.db import transaction
import redis
import redis_lock

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

def process_payment(order_id, user_id):
    """
    處理支付請求
    使用分布式鎖防止重複支付
    """
    lock_name = f'payment:order:{order_id}'

    # 使用分布式鎖
    lock = redis_lock.Lock(redis_client, lock_name, expire=60, auto_renewal=True)

    if not lock.acquire(blocking=False):
        return {'status': 'error', 'message': '訂單正在處理中'}

    try:
        with transaction.atomic():
            # 1. 查詢訂單
            order = Order.objects.select_for_update().get(id=order_id)

            # 2. 檢查訂單狀態
            if order.status != 'pending':
                return {'status': 'error', 'message': '訂單狀態異常'}

            # 3. 調用第三方支付接口
            payment_result = call_payment_gateway(order)

            # 4. 更新訂單狀態
            if payment_result['success']:
                order.status = 'paid'
                order.save()
                return {'status': 'success'}
            else:
                return {'status': 'error', 'message': '支付失敗'}

    finally:
        lock.release()

案例:防止重複執行定時任務

from celery import shared_task

@shared_task
def daily_report():
    """
    每日報表生成
    使用分布式鎖確保只有一個任務在執行
    """
    lock_name = 'task:daily_report'
    lock = redis_lock.Lock(redis_client, lock_name, expire=3600, auto_renewal=True)

    if not lock.acquire(blocking=False):
        # 任務已經在執行中
        print('Daily report task is already running')
        return

    try:
        # 生成報表
        generate_report()
        print('Daily report generated successfully')

    finally:
        lock.release()

🔥 高級話題:Redlock 算法

為什麼需要 Redlock?

單個 Redis 實例的分布式鎖存在問題:

情況:Redis 主從架構
1. 客戶端 A 在 Master 上獲取鎖
2. Master 宕機,鎖數據還沒同步到 Slave
3. Slave 升級為新的 Master
4. 客戶端 B 在新 Master 上成功獲取鎖
結果:兩個客戶端同時持有鎖!

Redlock 方案

使用 多個獨立的 Redis 實例(至少5個):

from redlock import Redlock

# 配置多個 Redis 節點
dlm = Redlock([
    {"host": "redis1", "port": 6379, "db": 0},
    {"host": "redis2", "port": 6379, "db": 0},
    {"host": "redis3", "port": 6379, "db": 0},
    {"host": "redis4", "port": 6379, "db": 0},
    {"host": "redis5", "port": 6379, "db": 0},
])

# 獲取鎖
lock = dlm.lock("product:123", 10000)  # 10秒過期

if lock:
    try:
        # 執行業務邏輯
        print('獲取鎖成功')
    finally:
        dlm.unlock(lock)
else:
    print('獲取鎖失敗')

原理:

  • 向所有節點請求鎖
  • 只有超過半數(N/2 + 1)節點成功獲取鎖,才算成功
  • 即使部分節點失敗,整體仍然可靠

📊 性能對比

方案QPS延遲可靠性
Redis 單實例10,000+1-5ms⭐⭐⭐
Redis Redlock5,000+5-10ms⭐⭐⭐⭐⭐
數據庫鎖1,000+10-50ms⭐⭐⭐⭐
Zookeeper3,000+10-20ms⭐⭐⭐⭐⭐

💡 面試要點

Q1: 為什麼 Redis 鎖需要設置過期時間?

答: 防止死鎖。如果持有鎖的客戶端崩潰,沒有過期時間的話,鎖永遠不會被釋放。

Q2: 為什麼釋放鎖要用 Lua 腳本?

答: 確保「檢查鎖是否是自己的」和「刪除鎖」這兩個操作的原子性,避免誤刪別人的鎖。

# ❌ 錯誤寫法
if redis_client.get(lock_name) == my_value:
    # 此時鎖可能已經過期,被別人獲取了
    redis_client.delete(lock_name)  # 誤刪別人的鎖!

# ✅ 正確寫法:使用 Lua 腳本
lua_script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
    return redis.call('DEL', KEYS[1])
else
    return 0
end
"""

Q3: Redis 單實例鎖 vs Redlock,如何選擇?

答:

  • Redis 單實例:適合大部分場景,性能好,簡單
  • Redlock:對一致性要求極高的場景(如金融系統)

實際上,Redis 單實例 + 主從 + 哨兵已經足夠可靠。

Q4: 如何避免鎖超時?

答: 使用自動續期機制。在業務執行過程中,定期檢查並延長鎖的過期時間。


🔗 下一篇

在下一篇文章中,我們將學習 防止超賣的完整方案,結合前面學到的庫存扣減、秒殺系統、分布式鎖,構建一個完整的防超賣體系。

閱讀時間:12 分鐘

0%