目錄
04-3. Deadlock(死鎖)
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
深入理解 Deadlock 的成因、必要條件、檢測與預防方法,學習如何避免這個可能導致程式完全卡死的嚴重問題。
🤔 什麼是 Deadlock?
Deadlock(死鎖) = 多個 Thread 互相等待對方釋放資源,導致所有 Thread 永遠阻塞
一句話解釋: 當兩個或多個 Thread 互相等待對方持有的資源,而誰都無法繼續執行,就發生了 Deadlock。
🚗 用十字路口來比喻
經典的十字路口 Deadlock
↑ 車 C
│
車 B ←──┼──→ 車 D
│
↓ 車 A
狀況:
- 車 A 等車 B 離開
- 車 B 等車 C 離開
- 車 C 等車 D 離開
- 車 D 等車 A 離開
結果:所有車都動不了!這就是 Deadlock💻 經典 Deadlock 案例
案例 1:哲學家就餐問題
from threading import Thread, Lock
import time
# 5 支筷子(Lock)
chopsticks = [Lock() for _ in range(5)]
def philosopher(philosopher_id):
"""哲學家進餐"""
left = philosopher_id
right = (philosopher_id + 1) % 5
print(f"哲學家 {philosopher_id} 開始思考")
time.sleep(1)
print(f"哲學家 {philosopher_id} 嘗試拿起左筷子 {left}")
chopsticks[left].acquire()
print(f"哲學家 {philosopher_id} 拿起了左筷子 {left}")
time.sleep(0.1) # 製造 Deadlock 的機會
print(f"哲學家 {philosopher_id} 嘗試拿起右筷子 {right}")
chopsticks[right].acquire() # ← 可能永遠等待
print(f"哲學家 {philosopher_id} 拿起了右筷子 {right}")
print(f"哲學家 {philosopher_id} 開始進餐")
time.sleep(2)
chopsticks[left].release()
chopsticks[right].release()
print(f"哲學家 {philosopher_id} 進餐完畢")
# 創建 5 個哲學家
philosophers = [Thread(target=philosopher, args=(i,)) for i in range(5)]
for p in philosophers:
p.start()
for p in philosophers:
p.join() # ← 可能永遠等待(Deadlock)可能的輸出(Deadlock):
哲學家 0 開始思考
哲學家 1 開始思考
...
哲學家 0 拿起了左筷子 0
哲學家 1 拿起了左筷子 1
哲學家 2 拿起了左筷子 2
哲學家 3 拿起了左筷子 3
哲學家 4 拿起了左筷子 4
哲學家 0 嘗試拿起右筷子 1 ← 被哲學家 1 持有
哲學家 1 嘗試拿起右筷子 2 ← 被哲學家 2 持有
...(所有人互相等待,程式卡死)案例 2:銀行轉帳 Deadlock
from threading import Thread, Lock
import time
class Account:
def __init__(self, account_id, balance):
self.account_id = account_id
self.balance = balance
self.lock = Lock()
def transfer_unsafe(from_account, to_account, amount):
"""不安全的轉帳(可能 Deadlock)"""
print(f"轉帳: {from_account.account_id} → {to_account.account_id}, {amount}")
# 獲取源帳戶鎖
from_account.lock.acquire()
print(f"已鎖定帳戶 {from_account.account_id}")
time.sleep(0.1) # 模擬處理時間
# 獲取目標帳戶鎖
to_account.lock.acquire()
print(f"已鎖定帳戶 {to_account.account_id}")
# 執行轉帳
from_account.balance -= amount
to_account.balance += amount
# 釋放鎖
to_account.lock.release()
from_account.lock.release()
# 創建兩個帳戶
account_a = Account('A', 1000)
account_b = Account('B', 1000)
# Thread 1: A → B
# Thread 2: B → A(同時進行)
t1 = Thread(target=transfer_unsafe, args=(account_a, account_b, 100))
t2 = Thread(target=transfer_unsafe, args=(account_b, account_a, 200))
t1.start()
t2.start()
t1.join() # ← 可能永遠等待
t2.join()Deadlock 發生過程:
時間 T0:
- Thread 1: 鎖定帳戶 A
- Thread 2: 鎖定帳戶 B
時間 T1:
- Thread 1: 等待帳戶 B 的鎖(被 Thread 2 持有)
- Thread 2: 等待帳戶 A 的鎖(被 Thread 1 持有)
結果:Deadlock!📋 Deadlock 的四個必要條件
Deadlock 發生必須同時滿足以下四個條件(Coffman 條件):
1. Mutual Exclusion(互斥)
定義: 資源一次只能被一個 Thread 使用
lock = Lock()
# 同一時間只有一個 Thread 可以獲得鎖
with lock:
# 臨界區
pass2. Hold and Wait(持有並等待)
定義: Thread 持有至少一個資源,同時等待其他資源
lock_a.acquire() # 持有 Lock A
# ...
lock_b.acquire() # 等待 Lock B3. No Preemption(不可搶占)
定義: 資源不能被強制從 Thread 中奪走,只能主動釋放
# 無法強制其他 Thread 釋放 Lock
lock.acquire() # 只能等待4. Circular Wait(循環等待)
定義: 存在一個 Thread 鏈,每個 Thread 都在等待下一個 Thread 持有的資源
Thread A → 等待 Lock 1 → 被 Thread B 持有
Thread B → 等待 Lock 2 → 被 Thread C 持有
Thread C → 等待 Lock 3 → 被 Thread A 持有
形成循環!🛡️ 預防 Deadlock
策略 1:Lock 順序(破壞循環等待)
from threading import Thread, Lock
import time
class Account:
def __init__(self, account_id, balance):
self.account_id = account_id
self.balance = balance
self.lock = Lock()
def transfer_safe(from_account, to_account, amount):
"""安全的轉帳(按 ID 順序鎖定)"""
# 總是按帳戶 ID 順序鎖定
first_lock = from_account if from_account.account_id < to_account.account_id else to_account
second_lock = to_account if from_account.account_id < to_account.account_id else from_account
with first_lock.lock:
time.sleep(0.1)
with second_lock.lock:
# 執行轉帳
from_account.balance -= amount
to_account.balance += amount
print(f"轉帳完成: {from_account.account_id} → {to_account.account_id}, {amount}")
# 測試
account_a = Account('A', 1000)
account_b = Account('B', 1000)
t1 = Thread(target=transfer_safe, args=(account_a, account_b, 100))
t2 = Thread(target=transfer_safe, args=(account_b, account_a, 200))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"帳戶 A: {account_a.balance}")
print(f"帳戶 B: {account_b.balance}")為什麼安全?
Thread 1: A → B
- 先鎖 A(ID 較小)
- 再鎖 B
Thread 2: B → A
- 先鎖 A(ID 較小)← 等待 Thread 1
- 再鎖 B
沒有循環等待,不會 Deadlock!策略 2:哲學家就餐的解決方案
方案 A:限制並發數量
from threading import Thread, Lock, Semaphore
import time
chopsticks = [Lock() for _ in range(5)]
# 最多 4 個哲學家同時進餐
max_diners = Semaphore(4)
def philosopher_safe_v1(philosopher_id):
left = philosopher_id
right = (philosopher_id + 1) % 5
with max_diners: # 限制並發
chopsticks[left].acquire()
chopsticks[right].acquire()
print(f"哲學家 {philosopher_id} 正在進餐")
time.sleep(2)
chopsticks[right].release()
chopsticks[left].release()方案 B:奇偶策略
def philosopher_safe_v2(philosopher_id):
left = philosopher_id
right = (philosopher_id + 1) % 5
# 奇數哲學家先拿右筷子
if philosopher_id % 2 == 1:
left, right = right, left
chopsticks[left].acquire()
chopsticks[right].acquire()
print(f"哲學家 {philosopher_id} 正在進餐")
time.sleep(2)
chopsticks[right].release()
chopsticks[left].release()策略 3:超時機制(破壞持有並等待)
from threading import Lock
import time
def try_acquire_locks(lock_a, lock_b, timeout=1):
"""嘗試獲取鎖,超時則釋放已持有的鎖"""
start_time = time.time()
# 獲取第一個鎖
if not lock_a.acquire(timeout=timeout):
return False
# 嘗試獲取第二個鎖
remaining_time = timeout - (time.time() - start_time)
if not lock_b.acquire(timeout=max(0, remaining_time)):
# 獲取失敗,釋放第一個鎖
lock_a.release()
return False
return True
def transfer_with_timeout(from_account, to_account, amount):
"""帶超時的轉帳"""
max_retries = 3
for attempt in range(max_retries):
if try_acquire_locks(from_account.lock, to_account.lock, timeout=1):
try:
# 執行轉帳
from_account.balance -= amount
to_account.balance += amount
print(f"轉帳成功(第 {attempt + 1} 次嘗試)")
return True
finally:
to_account.lock.release()
from_account.lock.release()
else:
print(f"獲取鎖失敗,重試... ({attempt + 1}/{max_retries})")
time.sleep(0.1) # 短暫等待後重試
print("轉帳失敗:超過最大重試次數")
return False策略 4:一次性獲取所有資源
from threading import Lock
class GlobalLock:
"""全域鎖,用於協調多個資源的獲取"""
def __init__(self):
self.lock = Lock()
def acquire_all(self, *locks):
"""一次性獲取所有鎖"""
with self.lock:
for lock in locks:
lock.acquire()
def release_all(self, *locks):
"""釋放所有鎖"""
for lock in locks:
lock.release()
global_lock = GlobalLock()
def transfer_atomic(from_account, to_account, amount):
"""原子性轉帳"""
global_lock.acquire_all(from_account.lock, to_account.lock)
try:
from_account.balance -= amount
to_account.balance += amount
print("轉帳完成")
finally:
global_lock.release_all(from_account.lock, to_account.lock)🔍 檢測 Deadlock
方法 1:Thread Dump
import threading
import sys
def print_thread_dump():
"""打印所有 Thread 的狀態"""
print("\n=== Thread Dump ===")
for thread in threading.enumerate():
print(f"Thread: {thread.name}")
print(f" Alive: {thread.is_alive()}")
print(f" Daemon: {thread.daemon}")
print(f" Ident: {thread.ident}")
print("==================\n")
# 定期檢查
import time
while True:
time.sleep(10)
print_thread_dump()方法 2:超時檢測
from threading import Thread
import time
def worker_with_deadlock():
# 可能會 Deadlock 的程式碼
pass
# 使用 timeout 檢測
t = Thread(target=worker_with_deadlock)
t.start()
t.join(timeout=5) # 最多等 5 秒
if t.is_alive():
print("⚠️ 可能發生 Deadlock!Thread 仍在執行")
# 無法強制終止 Python Thread,需要重新設計
else:
print("✅ Thread 正常結束")方法 3:資源分配圖
class DeadlockDetector:
"""簡單的 Deadlock 檢測器"""
def __init__(self):
self.wait_graph = {} # Thread → 等待的資源
self.hold_graph = {} # Thread → 持有的資源
def add_wait(self, thread_id, resource_id):
"""Thread 等待資源"""
if thread_id not in self.wait_graph:
self.wait_graph[thread_id] = []
self.wait_graph[thread_id].append(resource_id)
self.check_cycle()
def add_hold(self, thread_id, resource_id):
"""Thread 持有資源"""
if thread_id not in self.hold_graph:
self.hold_graph[thread_id] = []
self.hold_graph[thread_id].append(resource_id)
def check_cycle(self):
"""檢測循環等待"""
# 簡化版:檢測是否有循環
visited = set()
def dfs(thread_id, path):
if thread_id in path:
print(f"⚠️ 檢測到 Deadlock: {path + [thread_id]}")
return True
if thread_id in visited:
return False
visited.add(thread_id)
path.append(thread_id)
# 找到這個 Thread 等待的資源
if thread_id in self.wait_graph:
for resource in self.wait_graph[thread_id]:
# 找到持有這個資源的 Thread
for holder in self.hold_graph:
if resource in self.hold_graph[holder]:
if dfs(holder, path):
return True
path.pop()
return False
for thread in self.wait_graph:
if dfs(thread, []):
return True
return False🎯 Deadlock 處理策略總結
預防(Prevention)
破壞四個必要條件之一:
| 條件 | 破壞方法 |
|---|---|
| 互斥 | 使用無鎖資料結構(不實際) |
| 持有並等待 | 一次性獲取所有資源 |
| 不可搶占 | 使用超時,失敗則釋放 |
| 循環等待 | 資源排序,按順序獲取 |
避免(Avoidance)
# 銀行家算法(Banker's Algorithm)
# 在分配資源前檢查是否會導致 Deadlock
def is_safe_state(available, allocation, need):
"""檢查是否為安全狀態"""
# 實作銀行家算法
pass檢測與恢復(Detection & Recovery)
# 1. 定期檢測 Deadlock
# 2. 發現後終止部分 Thread 或回滾
def detect_and_recover():
if deadlock_detected():
# 選擇犧牲者(Victim Selection)
victim_thread = select_victim()
# 終止或回滾
terminate_thread(victim_thread)✅ 重點回顧
Deadlock 定義:
- 多個 Thread 互相等待資源
- 導致所有 Thread 永遠阻塞
- 程式完全卡死
四個必要條件(Coffman):
- 互斥 - 資源獨佔
- 持有並等待 - 持有資源同時等待
- 不可搶占 - 無法強制奪取
- 循環等待 - 形成等待循環
預防策略:
- ✅ Lock 順序 - 總是按相同順序獲取鎖
- ✅ 限制並發 - 減少競爭
- ✅ 超時機制 - 失敗則釋放並重試
- ✅ 一次性獲取 - 獲取所有資源或都不獲取
檢測方法:
- ✅ Thread Dump
- ✅ 超時檢測
- ✅ 資源分配圖分析
經典問題:
- 🍴 哲學家就餐問題
- 💰 銀行轉帳問題
- 🚗 十字路口問題
關鍵理解:
- ⚠️ Deadlock 很難除錯(程式卡死)
- ⚠️ 必須在設計階段預防
- ⚠️ 一旦發生,通常只能重啟
- ⚠️ Lock 順序是最簡單有效的預防方法
上一篇: 04-2. Race Condition(競態條件) 下一篇: 04-4. Lock / Mutex / Semaphore
最後更新:2025-01-06