目錄
04-4. Lock / Mutex / Semaphore 完整指南
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
完整掌握 Lock、Mutex、Semaphore 等同步機制的原理、用法、差異及最佳實踐,學會選擇最適合的同步工具。
🤔 同步機制概覽
同步機制是用來協調多個 Thread 存取共享資源的工具,防止 Race Condition 和 Deadlock。
同步機制家族
├─ Lock(鎖)
│ ├─ Mutex(互斥鎖)
│ └─ RLock(可重入鎖)
├─ Semaphore(信號量)
│ ├─ Binary Semaphore(二元信號量)
│ └─ Counting Semaphore(計數信號量)
├─ Event(事件)
├─ Condition(條件變數)
└─ Barrier(屏障)1️⃣ Lock(互斥鎖 / Mutex)
基本概念
Lock = 互斥鎖(Mutual Exclusion Lock)
from threading import Lock
lock = Lock()
# 臨界區(Critical Section)
lock.acquire() # 獲取鎖
try:
# 只有一個 Thread 可以進入
shared_resource += 1
finally:
lock.release() # 釋放鎖使用 with 語句(推薦)
from threading import Lock
lock = Lock()
# with 自動處理 acquire 和 release
with lock:
# 臨界區
shared_resource += 1完整範例:安全計數器
from threading import Thread, Lock
class SafeCounter:
def __init__(self):
self.count = 0
self.lock = Lock()
def increment(self):
with self.lock:
self.count += 1
def decrement(self):
with self.lock:
self.count -= 1
def get_value(self):
with self.lock:
return self.count
# 測試
counter = SafeCounter()
def worker():
for _ in range(100000):
counter.increment()
threads = [Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最終計數: {counter.get_value()}") # 1000000(正確)Lock 的狀態
from threading import Lock
lock = Lock()
# 檢查鎖的狀態
print(f"鎖定: {lock.locked()}") # False
lock.acquire()
print(f"鎖定: {lock.locked()}") # True
lock.release()
print(f"鎖定: {lock.locked()}") # False非阻塞獲取鎖
from threading import Lock
import time
lock = Lock()
# 阻塞獲取(預設)
lock.acquire() # 會等待直到獲得鎖
# 非阻塞獲取
if lock.acquire(blocking=False):
print("獲得鎖")
lock.release()
else:
print("無法獲得鎖,繼續其他工作")
# 帶超時的獲取
if lock.acquire(timeout=2): # 最多等 2 秒
print("獲得鎖")
lock.release()
else:
print("超時,無法獲得鎖")2️⃣ RLock(可重入鎖)
問題:Lock 不能重複獲取
from threading import Lock
lock = Lock()
lock.acquire()
lock.acquire() # ← 死鎖!無法再次獲取解決:使用 RLock
from threading import RLock
rlock = RLock()
rlock.acquire()
rlock.acquire() # ✅ OK,可以重複獲取
rlock.release()
rlock.release() # 必須釋放相同次數使用場景:遞迴函式
from threading import RLock
class BankAccount:
def __init__(self):
self.balance = 0
self.lock = RLock() # 使用 RLock
def deposit(self, amount):
with self.lock:
self.balance += amount
print(f"存入 {amount},餘額 {self.balance}")
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
print(f"提款 {amount},餘額 {self.balance}")
return True
return False
def transfer_to_savings(self, amount):
"""轉帳到儲蓄帳戶(需要再次獲取鎖)"""
with self.lock: # 第一次獲取
if self.withdraw(amount): # 內部又獲取了鎖(RLock 允許)
print(f"轉帳 {amount} 到儲蓄帳戶")
return True
return False
account = BankAccount()
account.deposit(1000)
account.transfer_to_savings(500)Lock vs RLock
| 特性 | Lock | RLock |
|---|---|---|
| 重複獲取 | ❌ 不可以(死鎖) | ✅ 可以 |
| 效能 | ⚡ 快 | 🐢 較慢 |
| 記憶體 | 小 | 大(需追蹤持有者) |
| 適用場景 | 簡單臨界區 | 遞迴、嵌套呼叫 |
3️⃣ Semaphore(信號量)
基本概念
Semaphore = 計數器,限制同時存取資源的 Thread 數量
from threading import Semaphore
# 創建信號量(最多 3 個 Thread)
semaphore = Semaphore(3)
# 獲取資源
semaphore.acquire() # 計數 -1
# 使用資源
# ...
# 釋放資源
semaphore.release() # 計數 +1完整範例:限制並發連線
from threading import Thread, Semaphore
import time
# 最多 3 個 Thread 同時存取資料庫
db_connection_pool = Semaphore(3)
def access_database(user_id):
print(f"用戶 {user_id} 等待資料庫連線...")
with db_connection_pool: # 獲取連線
print(f"用戶 {user_id} 連線到資料庫")
time.sleep(2) # 模擬查詢
print(f"用戶 {user_id} 查詢完成")
# 創建 10 個 Thread
threads = [Thread(target=access_database, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()輸出:
用戶 0 等待資料庫連線...
用戶 0 連線到資料庫
用戶 1 等待資料庫連線...
用戶 1 連線到資料庫
用戶 2 等待資料庫連線...
用戶 2 連線到資料庫
用戶 3 等待資料庫連線... ← 等待中
用戶 4 等待資料庫連線... ← 等待中
...
用戶 0 查詢完成 ← 釋放連線
用戶 3 連線到資料庫 ← 獲得連線
...BoundedSemaphore(有界信號量)
from threading import BoundedSemaphore
# BoundedSemaphore 防止 release 超過初始值
semaphore = BoundedSemaphore(3)
semaphore.acquire()
semaphore.release()
semaphore.release()
semaphore.release()
semaphore.release() # ← ValueError! 超過初始值實戰案例:資源池
from threading import Thread, Semaphore
import time
class ResourcePool:
def __init__(self, max_resources):
self.semaphore = Semaphore(max_resources)
self.resources = [f"Resource-{i}" for i in range(max_resources)]
self.lock = Lock()
def acquire_resource(self):
"""獲取資源"""
self.semaphore.acquire()
with self.lock:
resource = self.resources.pop()
return resource
def release_resource(self, resource):
"""釋放資源"""
with self.lock:
self.resources.append(resource)
self.semaphore.release()
pool = ResourcePool(max_resources=3)
def worker(worker_id):
resource = pool.acquire_resource()
print(f"Worker {worker_id} 獲得 {resource}")
time.sleep(2)
print(f"Worker {worker_id} 釋放 {resource}")
pool.release_resource(resource)
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()4️⃣ Event(事件)
基本概念
Event = 信號機制,用於 Thread 間通知
from threading import Thread, Event
import time
event = Event()
def waiter():
print("等待事件...")
event.wait() # 阻塞直到事件被設置
print("事件發生,繼續執行")
def setter():
print("準備觸發事件...")
time.sleep(3)
event.set() # 設置事件
print("事件已觸發")
Thread(target=waiter).start()
Thread(target=setter).start()Event 方法
from threading import Event
event = Event()
# 設置事件
event.set()
# 清除事件
event.clear()
# 檢查事件狀態
if event.is_set():
print("事件已設置")
# 等待事件
event.wait() # 阻塞直到事件被設置
event.wait(timeout=5) # 最多等 5 秒實戰案例:啟動信號
from threading import Thread, Event
import time
ready_event = Event()
def initialize():
"""初始化服務"""
print("正在初始化...")
time.sleep(3) # 模擬初始化
print("初始化完成")
ready_event.set() # 通知已就緒
def worker(worker_id):
"""Worker 等待初始化完成"""
print(f"Worker {worker_id} 等待初始化...")
ready_event.wait() # 等待初始化
print(f"Worker {worker_id} 開始工作")
# 啟動初始化
Thread(target=initialize).start()
# 啟動 Worker
for i in range(3):
Thread(target=worker, args=(i,)).start()5️⃣ Condition(條件變數)
基本概念
Condition = Lock + Event 的組合,用於複雜的等待/通知場景
from threading import Condition
condition = Condition()
# 等待條件
with condition:
condition.wait() # 釋放鎖並等待通知
# 通知等待者
with condition:
condition.notify() # 通知一個等待者
condition.notify_all() # 通知所有等待者完整範例:生產者-消費者
from threading import Thread, Condition
import time
import random
class Queue:
def __init__(self, max_size):
self.items = []
self.max_size = max_size
self.condition = Condition()
def produce(self, item):
"""生產者"""
with self.condition:
# 等待隊列有空間
while len(self.items) >= self.max_size:
print(f"隊列已滿,等待...")
self.condition.wait()
self.items.append(item)
print(f"生產: {item},隊列: {len(self.items)}/{self.max_size}")
# 通知消費者
self.condition.notify()
def consume(self):
"""消費者"""
with self.condition:
# 等待隊列有資料
while len(self.items) == 0:
print(f"隊列為空,等待...")
self.condition.wait()
item = self.items.pop(0)
print(f"消費: {item},隊列: {len(self.items)}/{self.max_size}")
# 通知生產者
self.condition.notify()
return item
queue = Queue(max_size=5)
def producer():
for i in range(10):
queue.produce(f"Item-{i}")
time.sleep(random.uniform(0.1, 0.5))
def consumer():
for _ in range(10):
queue.consume()
time.sleep(random.uniform(0.2, 0.8))
Thread(target=producer).start()
Thread(target=consumer).start()6️⃣ Barrier(屏障)
基本概念
Barrier = 同步點,讓多個 Thread 在此等待,直到所有 Thread 都到達
from threading import Thread, Barrier
import time
# 創建屏障(3 個 Thread)
barrier = Barrier(3)
def worker(worker_id):
print(f"Worker {worker_id} 階段 1")
time.sleep(worker_id)
print(f"Worker {worker_id} 到達屏障")
barrier.wait() # 等待其他 Thread
print(f"Worker {worker_id} 階段 2")
threads = [Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()輸出:
Worker 0 階段 1
Worker 1 階段 1
Worker 2 階段 1
Worker 0 到達屏障 ← 等待
Worker 1 到達屏障 ← 等待
Worker 2 到達屏障 ← 所有人到齊
Worker 0 階段 2 ← 同時繼續
Worker 1 階段 2
Worker 2 階段 2📊 同步機制對比
功能對比
| 機制 | 用途 | 適用場景 |
|---|---|---|
| Lock | 互斥存取 | 保護共享資源 |
| RLock | 可重入互斥 | 遞迴、嵌套呼叫 |
| Semaphore | 限制並發數 | 資源池、連線池 |
| Event | 簡單通知 | 啟動信號、停止信號 |
| Condition | 複雜等待/通知 | 生產者-消費者 |
| Barrier | 同步點 | 階段性同步 |
選擇指南
# 保護共享變數 → Lock
counter = 0
lock = Lock()
with lock:
counter += 1
# 遞迴函式 → RLock
rlock = RLock()
def recursive(n):
with rlock:
if n > 0:
recursive(n - 1)
# 限制並發 → Semaphore
pool = Semaphore(5)
with pool:
use_limited_resource()
# 等待初始化 → Event
ready = Event()
ready.wait()
# 生產者-消費者 → Condition
condition = Condition()
with condition:
condition.wait()
# 階段同步 → Barrier
barrier = Barrier(3)
barrier.wait()🎯 最佳實踐
1. 總是使用 with
# ❌ 錯誤:可能忘記釋放
lock.acquire()
do_something()
lock.release()
# ✅ 正確:自動釋放
with lock:
do_something()2. 避免嵌套鎖
# ❌ 危險:可能死鎖
with lock_a:
with lock_b:
pass
# ✅ 較好:按順序獲取
locks = [lock_a, lock_b]
locks.sort(key=id) # 按 ID 排序
for lock in locks:
lock.acquire()
try:
pass
finally:
for lock in reversed(locks):
lock.release()3. 設置合理的超時
# ✅ 使用超時避免永遠等待
if lock.acquire(timeout=5):
try:
do_something()
finally:
lock.release()
else:
print("獲取鎖超時")4. 選擇最簡單的機制
# 如果 Lock 夠用,不要用 RLock
# 如果 Event 夠用,不要用 Condition
# 越簡單越好!✅ 重點回顧
Lock(互斥鎖):
- ✅ 最基本的同步機制
- ✅ 保護臨界區
- ✅ 同一時間只有一個 Thread
- ❌ 不能重複獲取
RLock(可重入鎖):
- ✅ 可以重複獲取
- ✅ 適合遞迴、嵌套
- ⚠️ 效能較 Lock 差
Semaphore(信號量):
- ✅ 限制並發數量
- ✅ 適合資源池
- ✅ 計數器機制
Event(事件):
- ✅ 簡單的通知機制
- ✅ 一對多通知
- ✅ 適合啟動/停止信號
Condition(條件變數):
- ✅ 複雜的等待/通知
- ✅ 生產者-消費者模式
- ✅ Lock + Event 的組合
Barrier(屏障):
- ✅ 同步點
- ✅ 等待所有 Thread 到達
- ✅ 階段性同步
關鍵原則:
- ✅ 總是使用
with語句 - ✅ 避免嵌套鎖(防止死鎖)
- ✅ 設置合理超時
- ✅ 選擇最簡單的機制
上一篇: 04-3. Deadlock(死鎖)
最後更新:2025-01-06