04-4. Lock / Mutex / Semaphore 完整指南

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

完整掌握 Lock、Mutex、Semaphore 等同步機制的原理、用法、差異及最佳實踐,學會選擇最適合的同步工具。


🤔 同步機制概覽

同步機制是用來協調多個 Thread 存取共享資源的工具,防止 Race Condition 和 Deadlock。

同步機制家族
├─ Lock(鎖)
│  ├─ Mutex(互斥鎖)
│  └─ RLock(可重入鎖)
├─ Semaphore(信號量)
│  ├─ Binary Semaphore(二元信號量)
│  └─ Counting Semaphore(計數信號量)
├─ Event(事件)
├─ Condition(條件變數)
└─ Barrier(屏障)

1️⃣ Lock(互斥鎖 / Mutex)

基本概念

Lock = 互斥鎖(Mutual Exclusion Lock)

from threading import Lock

lock = Lock()

# 臨界區(Critical Section)
lock.acquire()  # 獲取鎖
try:
    # 只有一個 Thread 可以進入
    shared_resource += 1
finally:
    lock.release()  # 釋放鎖

使用 with 語句(推薦)

from threading import Lock

lock = Lock()

# with 自動處理 acquire 和 release
with lock:
    # 臨界區
    shared_resource += 1

完整範例:安全計數器

from threading import Thread, Lock

class SafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.count += 1

    def decrement(self):
        with self.lock:
            self.count -= 1

    def get_value(self):
        with self.lock:
            return self.count

# 測試
counter = SafeCounter()

def worker():
    for _ in range(100000):
        counter.increment()

threads = [Thread(target=worker) for _ in range(10)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最終計數: {counter.get_value()}")  # 1000000(正確)

Lock 的狀態

from threading import Lock

lock = Lock()

# 檢查鎖的狀態
print(f"鎖定: {lock.locked()}")  # False

lock.acquire()
print(f"鎖定: {lock.locked()}")  # True

lock.release()
print(f"鎖定: {lock.locked()}")  # False

非阻塞獲取鎖

from threading import Lock
import time

lock = Lock()

# 阻塞獲取(預設)
lock.acquire()  # 會等待直到獲得鎖

# 非阻塞獲取
if lock.acquire(blocking=False):
    print("獲得鎖")
    lock.release()
else:
    print("無法獲得鎖,繼續其他工作")

# 帶超時的獲取
if lock.acquire(timeout=2):  # 最多等 2 秒
    print("獲得鎖")
    lock.release()
else:
    print("超時,無法獲得鎖")

2️⃣ RLock(可重入鎖)

問題:Lock 不能重複獲取

from threading import Lock

lock = Lock()

lock.acquire()
lock.acquire()  # ← 死鎖!無法再次獲取

解決:使用 RLock

from threading import RLock

rlock = RLock()

rlock.acquire()
rlock.acquire()  # ✅ OK,可以重複獲取
rlock.release()
rlock.release()  # 必須釋放相同次數

使用場景:遞迴函式

from threading import RLock

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.lock = RLock()  # 使用 RLock

    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f"存入 {amount},餘額 {self.balance}")

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                print(f"提款 {amount},餘額 {self.balance}")
                return True
            return False

    def transfer_to_savings(self, amount):
        """轉帳到儲蓄帳戶(需要再次獲取鎖)"""
        with self.lock:  # 第一次獲取
            if self.withdraw(amount):  # 內部又獲取了鎖(RLock 允許)
                print(f"轉帳 {amount} 到儲蓄帳戶")
                return True
            return False

account = BankAccount()
account.deposit(1000)
account.transfer_to_savings(500)

Lock vs RLock

特性LockRLock
重複獲取❌ 不可以(死鎖)✅ 可以
效能⚡ 快🐢 較慢
記憶體大(需追蹤持有者)
適用場景簡單臨界區遞迴、嵌套呼叫

3️⃣ Semaphore(信號量)

基本概念

Semaphore = 計數器,限制同時存取資源的 Thread 數量

from threading import Semaphore

# 創建信號量(最多 3 個 Thread)
semaphore = Semaphore(3)

# 獲取資源
semaphore.acquire()  # 計數 -1

# 使用資源
# ...

# 釋放資源
semaphore.release()  # 計數 +1

完整範例:限制並發連線

from threading import Thread, Semaphore
import time

# 最多 3 個 Thread 同時存取資料庫
db_connection_pool = Semaphore(3)

def access_database(user_id):
    print(f"用戶 {user_id} 等待資料庫連線...")

    with db_connection_pool:  # 獲取連線
        print(f"用戶 {user_id} 連線到資料庫")
        time.sleep(2)  # 模擬查詢
        print(f"用戶 {user_id} 查詢完成")

# 創建 10 個 Thread
threads = [Thread(target=access_database, args=(i,)) for i in range(10)]

for t in threads:
    t.start()

for t in threads:
    t.join()

輸出:

用戶 0 等待資料庫連線...
用戶 0 連線到資料庫
用戶 1 等待資料庫連線...
用戶 1 連線到資料庫
用戶 2 等待資料庫連線...
用戶 2 連線到資料庫
用戶 3 等待資料庫連線...  ← 等待中
用戶 4 等待資料庫連線...  ← 等待中
...
用戶 0 查詢完成  ← 釋放連線
用戶 3 連線到資料庫  ← 獲得連線
...

BoundedSemaphore(有界信號量)

from threading import BoundedSemaphore

# BoundedSemaphore 防止 release 超過初始值
semaphore = BoundedSemaphore(3)

semaphore.acquire()
semaphore.release()
semaphore.release()
semaphore.release()
semaphore.release()  # ← ValueError! 超過初始值

實戰案例:資源池

from threading import Thread, Semaphore
import time

class ResourcePool:
    def __init__(self, max_resources):
        self.semaphore = Semaphore(max_resources)
        self.resources = [f"Resource-{i}" for i in range(max_resources)]
        self.lock = Lock()

    def acquire_resource(self):
        """獲取資源"""
        self.semaphore.acquire()
        with self.lock:
            resource = self.resources.pop()
        return resource

    def release_resource(self, resource):
        """釋放資源"""
        with self.lock:
            self.resources.append(resource)
        self.semaphore.release()

pool = ResourcePool(max_resources=3)

def worker(worker_id):
    resource = pool.acquire_resource()
    print(f"Worker {worker_id} 獲得 {resource}")
    time.sleep(2)
    print(f"Worker {worker_id} 釋放 {resource}")
    pool.release_resource(resource)

threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

4️⃣ Event(事件)

基本概念

Event = 信號機制,用於 Thread 間通知

from threading import Thread, Event
import time

event = Event()

def waiter():
    print("等待事件...")
    event.wait()  # 阻塞直到事件被設置
    print("事件發生,繼續執行")

def setter():
    print("準備觸發事件...")
    time.sleep(3)
    event.set()  # 設置事件
    print("事件已觸發")

Thread(target=waiter).start()
Thread(target=setter).start()

Event 方法

from threading import Event

event = Event()

# 設置事件
event.set()

# 清除事件
event.clear()

# 檢查事件狀態
if event.is_set():
    print("事件已設置")

# 等待事件
event.wait()  # 阻塞直到事件被設置
event.wait(timeout=5)  # 最多等 5 秒

實戰案例:啟動信號

from threading import Thread, Event
import time

ready_event = Event()

def initialize():
    """初始化服務"""
    print("正在初始化...")
    time.sleep(3)  # 模擬初始化
    print("初始化完成")
    ready_event.set()  # 通知已就緒

def worker(worker_id):
    """Worker 等待初始化完成"""
    print(f"Worker {worker_id} 等待初始化...")
    ready_event.wait()  # 等待初始化
    print(f"Worker {worker_id} 開始工作")

# 啟動初始化
Thread(target=initialize).start()

# 啟動 Worker
for i in range(3):
    Thread(target=worker, args=(i,)).start()

5️⃣ Condition(條件變數)

基本概念

Condition = Lock + Event 的組合,用於複雜的等待/通知場景

from threading import Condition

condition = Condition()

# 等待條件
with condition:
    condition.wait()  # 釋放鎖並等待通知

# 通知等待者
with condition:
    condition.notify()  # 通知一個等待者
    condition.notify_all()  # 通知所有等待者

完整範例:生產者-消費者

from threading import Thread, Condition
import time
import random

class Queue:
    def __init__(self, max_size):
        self.items = []
        self.max_size = max_size
        self.condition = Condition()

    def produce(self, item):
        """生產者"""
        with self.condition:
            # 等待隊列有空間
            while len(self.items) >= self.max_size:
                print(f"隊列已滿,等待...")
                self.condition.wait()

            self.items.append(item)
            print(f"生產: {item},隊列: {len(self.items)}/{self.max_size}")

            # 通知消費者
            self.condition.notify()

    def consume(self):
        """消費者"""
        with self.condition:
            # 等待隊列有資料
            while len(self.items) == 0:
                print(f"隊列為空,等待...")
                self.condition.wait()

            item = self.items.pop(0)
            print(f"消費: {item},隊列: {len(self.items)}/{self.max_size}")

            # 通知生產者
            self.condition.notify()
            return item

queue = Queue(max_size=5)

def producer():
    for i in range(10):
        queue.produce(f"Item-{i}")
        time.sleep(random.uniform(0.1, 0.5))

def consumer():
    for _ in range(10):
        queue.consume()
        time.sleep(random.uniform(0.2, 0.8))

Thread(target=producer).start()
Thread(target=consumer).start()

6️⃣ Barrier(屏障)

基本概念

Barrier = 同步點,讓多個 Thread 在此等待,直到所有 Thread 都到達

from threading import Thread, Barrier
import time

# 創建屏障(3 個 Thread)
barrier = Barrier(3)

def worker(worker_id):
    print(f"Worker {worker_id} 階段 1")
    time.sleep(worker_id)

    print(f"Worker {worker_id} 到達屏障")
    barrier.wait()  # 等待其他 Thread

    print(f"Worker {worker_id} 階段 2")

threads = [Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

輸出:

Worker 0 階段 1
Worker 1 階段 1
Worker 2 階段 1
Worker 0 到達屏障  ← 等待
Worker 1 到達屏障  ← 等待
Worker 2 到達屏障  ← 所有人到齊
Worker 0 階段 2    ← 同時繼續
Worker 1 階段 2
Worker 2 階段 2

📊 同步機制對比

功能對比

機制用途適用場景
Lock互斥存取保護共享資源
RLock可重入互斥遞迴、嵌套呼叫
Semaphore限制並發數資源池、連線池
Event簡單通知啟動信號、停止信號
Condition複雜等待/通知生產者-消費者
Barrier同步點階段性同步

選擇指南

# 保護共享變數 → Lock
counter = 0
lock = Lock()
with lock:
    counter += 1

# 遞迴函式 → RLock
rlock = RLock()
def recursive(n):
    with rlock:
        if n > 0:
            recursive(n - 1)

# 限制並發 → Semaphore
pool = Semaphore(5)
with pool:
    use_limited_resource()

# 等待初始化 → Event
ready = Event()
ready.wait()

# 生產者-消費者 → Condition
condition = Condition()
with condition:
    condition.wait()

# 階段同步 → Barrier
barrier = Barrier(3)
barrier.wait()

🎯 最佳實踐

1. 總是使用 with

# ❌ 錯誤:可能忘記釋放
lock.acquire()
do_something()
lock.release()

# ✅ 正確:自動釋放
with lock:
    do_something()

2. 避免嵌套鎖

# ❌ 危險:可能死鎖
with lock_a:
    with lock_b:
        pass

# ✅ 較好:按順序獲取
locks = [lock_a, lock_b]
locks.sort(key=id)  # 按 ID 排序
for lock in locks:
    lock.acquire()
try:
    pass
finally:
    for lock in reversed(locks):
        lock.release()

3. 設置合理的超時

# ✅ 使用超時避免永遠等待
if lock.acquire(timeout=5):
    try:
        do_something()
    finally:
        lock.release()
else:
    print("獲取鎖超時")

4. 選擇最簡單的機制

# 如果 Lock 夠用,不要用 RLock
# 如果 Event 夠用,不要用 Condition
# 越簡單越好!

✅ 重點回顧

Lock(互斥鎖):

  • ✅ 最基本的同步機制
  • ✅ 保護臨界區
  • ✅ 同一時間只有一個 Thread
  • ❌ 不能重複獲取

RLock(可重入鎖):

  • ✅ 可以重複獲取
  • ✅ 適合遞迴、嵌套
  • ⚠️ 效能較 Lock 差

Semaphore(信號量):

  • ✅ 限制並發數量
  • ✅ 適合資源池
  • ✅ 計數器機制

Event(事件):

  • ✅ 簡單的通知機制
  • ✅ 一對多通知
  • ✅ 適合啟動/停止信號

Condition(條件變數):

  • ✅ 複雜的等待/通知
  • ✅ 生產者-消費者模式
  • ✅ Lock + Event 的組合

Barrier(屏障):

  • ✅ 同步點
  • ✅ 等待所有 Thread 到達
  • ✅ 階段性同步

關鍵原則:

  • ✅ 總是使用 with 語句
  • ✅ 避免嵌套鎖(防止死鎖)
  • ✅ 設置合理超時
  • ✅ 選擇最簡單的機制

上一篇: 04-3. Deadlock(死鎖)


最後更新:2025-01-06

0%