目錄
02-4. Thread 同步機制
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🤔 一句話解釋
Thread 同步機制用於協調多個 Thread 的執行順序,避免 Race Condition 和資料不一致問題。
🚦 為什麼需要同步機制?
問題:Race Condition
from threading import Thread
balance = 1000 # 銀行帳戶餘額
def withdraw(amount):
global balance
# 危險操作!分三步:
temp = balance # 1. 讀取
temp = temp - amount # 2. 計算
balance = temp # 3. 寫回
# 兩個 Thread 同時提款
t1 = Thread(target=withdraw, args=(500,))
t2 = Thread(target=withdraw, args=(300,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"餘額: {balance}")
# 預期:200 (1000 - 500 - 300)
# 實際:可能是 700(資料丟失!)問題分析:
初始餘額:1000
Thread 1: READ balance = 1000
Thread 2: READ balance = 1000 ← 同時讀取
Thread 1: CALC 1000 - 500 = 500
Thread 2: CALC 1000 - 300 = 700
Thread 1: WRITE balance = 500
Thread 2: WRITE balance = 700 ← 覆蓋了 Thread 1 的結果
💥 應該是 200,結果是 700!🔒 同步機制 1:Lock(互斥鎖)
定義:確保同一時間只有一個 Thread 能執行臨界區程式碼。
基本用法
from threading import Thread, Lock
balance = 1000
lock = Lock() # 創建鎖
def withdraw(amount):
global balance
lock.acquire() # 🔒 加鎖
try:
temp = balance
temp = temp - amount
balance = temp
print(f"提款 {amount},餘額: {balance}")
finally:
lock.release() # 🔓 解鎖
t1 = Thread(target=withdraw, args=(500,))
t2 = Thread(target=withdraw, args=(300,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最終餘額: {balance}")
# 輸出:最終餘額: 200 ← 正確!使用 with 語句(推薦)
from threading import Thread, Lock
balance = 1000
lock = Lock()
def withdraw(amount):
global balance
with lock: # ✅ 自動 acquire() 和 release()
temp = balance
temp = temp - amount
balance = temp
print(f"提款 {amount},餘額: {balance}")
# 測試
threads = [Thread(target=withdraw, args=(100,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最終餘額: {balance}")
# 輸出:最終餘額: 500 ← 正確!Lock 的視覺化
Thread 1: [等待 Lock] → [獲得 Lock] → [執行臨界區] → [釋放 Lock]
↓
Thread 2: [等待 Lock] ←────────────────────┘ → [獲得 Lock] → ...
↓
Thread 3: [等待 Lock] ←─────────────────────────────┘關鍵:
- 一次只能有一個 Thread 持有 Lock
- 其他 Thread 必須等待
- 避免了 Race Condition
🔁 同步機制 2:RLock(可重入鎖)
定義:同一個 Thread 可以多次獲取同一個 RLock,不會死鎖。
Lock 的問題
from threading import Lock
lock = Lock()
def outer():
with lock:
print("Outer 獲得鎖")
inner() # 💀 死鎖!
def inner():
with lock: # ❌ 同一個 Thread 再次嘗試獲取鎖
print("Inner 獲得鎖")
outer()
# 程式卡死!RLock 的解決方案
from threading import RLock
rlock = RLock() # 可重入鎖
def outer():
with rlock:
print("Outer 獲得鎖")
inner() # ✅ 可以再次獲取
def inner():
with rlock: # ✅ 同一個 Thread 可以重入
print("Inner 獲得鎖")
outer()輸出:
Outer 獲得鎖
Inner 獲得鎖RLock 的計數機制
from threading import RLock
rlock = RLock()
# 同一個 Thread 多次獲取
rlock.acquire() # 計數:1
print("第一次獲取")
rlock.acquire() # 計數:2
print("第二次獲取")
rlock.acquire() # 計數:3
print("第三次獲取")
# 必須釋放相同次數
rlock.release() # 計數:2
rlock.release() # 計數:1
rlock.release() # 計數:0,真正釋放🚦 同步機制 3:Semaphore(號誌/信號量)
定義:允許固定數量的 Thread 同時訪問資源。
限制並發數量
from threading import Thread, Semaphore
import time
# 最多 3 個 Thread 同時訪問
semaphore = Semaphore(3)
def access_resource(name):
print(f"{name} 等待進入")
with semaphore: # 🚦 獲取信號量
print(f"{name} 進入資源區")
time.sleep(2) # 模擬使用資源
print(f"{name} 離開資源區")
# 創建 10 個 Thread
threads = [Thread(target=access_resource, args=(f'Thread-{i}',)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()輸出:
Thread-0 等待進入
Thread-1 等待進入
Thread-2 等待進入
Thread-0 進入資源區 ← 前 3 個同時進入
Thread-1 進入資源區
Thread-2 進入資源區
Thread-3 等待進入 ← 後面的等待
Thread-4 等待進入
...
(2 秒後)
Thread-0 離開資源區
Thread-3 進入資源區 ← 有位置後進入實際案例:資料庫連線池
from threading import Thread, Semaphore
import time
class ConnectionPool:
def __init__(self, max_connections):
self.semaphore = Semaphore(max_connections)
def query(self, query_id):
print(f"查詢 {query_id} 等待連線")
with self.semaphore:
print(f"查詢 {query_id} 獲得連線")
time.sleep(1) # 模擬查詢
print(f"查詢 {query_id} 完成")
# 資料庫連線池:最多 5 個連線
pool = ConnectionPool(max_connections=5)
# 10 個查詢請求
threads = [Thread(target=pool.query, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()🔔 同步機制 4:Event(事件)
定義:一個 Thread 發送信號,其他 Thread 等待信號。
基本用法
from threading import Thread, Event
import time
# 創建事件
event = Event()
def waiter(name):
print(f"{name} 等待信號")
event.wait() # ⏸️ 阻塞,直到收到信號
print(f"{name} 收到信號,開始工作")
def signaler():
print("Signaler: 準備中...")
time.sleep(3)
print("Signaler: 發送信號!")
event.set() # 🔔 發送信號
# 創建等待者
w1 = Thread(target=waiter, args=('Waiter-1',))
w2 = Thread(target=waiter, args=('Waiter-2',))
w3 = Thread(target=waiter, args=('Waiter-3',))
# 創建信號發送者
s = Thread(target=signaler)
# 啟動
w1.start()
w2.start()
w3.start()
s.start()
w1.join()
w2.join()
w3.join()
s.join()輸出:
Waiter-1 等待信號
Waiter-2 等待信號
Waiter-3 等待信號
Signaler: 準備中...
(等待 3 秒...)
Signaler: 發送信號!
Waiter-1 收到信號,開始工作
Waiter-2 收到信號,開始工作
Waiter-3 收到信號,開始工作實際案例:伺服器啟動通知
from threading import Thread, Event
import time
server_started = Event()
def start_server():
print("伺服器啟動中...")
time.sleep(3) # 模擬啟動過程
print("伺服器啟動完成!")
server_started.set() # 🔔 通知已啟動
def client_request(client_id):
print(f"客戶端 {client_id} 等待伺服器啟動")
server_started.wait() # ⏸️ 等待伺服器啟動
print(f"客戶端 {client_id} 發送請求")
# 啟動伺服器
server = Thread(target=start_server)
server.start()
# 創建客戶端
clients = [Thread(target=client_request, args=(i,)) for i in range(5)]
for c in clients:
c.start()
server.join()
for c in clients:
c.join()Event 的方法
from threading import Event
event = Event()
# 檢查狀態
print(event.is_set()) # False
# 設置信號
event.set()
print(event.is_set()) # True
# 清除信號
event.clear()
print(event.is_set()) # False
# 等待信號(可設超時)
event.wait(timeout=5) # 最多等 5 秒🚧 同步機制 5:Condition(條件變數)
定義:複雜的等待/通知機制,支援多個條件。
生產者-消費者模式
from threading import Thread, Condition
import time
# 共享資源
queue = []
condition = Condition()
def producer():
for i in range(5):
with condition:
queue.append(i)
print(f"生產: {i},佇列: {queue}")
condition.notify() # 🔔 通知消費者
time.sleep(1)
def consumer():
for _ in range(5):
with condition:
while len(queue) == 0:
print("消費者: 佇列為空,等待...")
condition.wait() # ⏸️ 等待生產者
item = queue.pop(0)
print(f"消費: {item},佇列: {queue}")
p = Thread(target=producer)
c = Thread(target=consumer)
c.start()
time.sleep(0.1) # 確保消費者先啟動
p.start()
p.join()
c.join()輸出:
消費者: 佇列為空,等待...
生產: 0,佇列: [0]
消費: 0,佇列: []
消費者: 佇列為空,等待...
生產: 1,佇列: [1]
消費: 1,佇列: []
...Condition 的方法
from threading import Condition
condition = Condition()
# 等待
condition.wait() # 等待通知
condition.wait(timeout=5) # 最多等 5 秒
# 通知
condition.notify() # 通知一個等待者
condition.notify_all() # 通知所有等待者🏁 同步機制 6:Barrier(屏障)
定義:等待所有 Thread 到達某個點後,才一起繼續執行。
基本用法
from threading import Thread, Barrier
import time
import random
# 創建 Barrier:等待 3 個 Thread
barrier = Barrier(3)
def runner(name):
print(f"{name} 開始準備")
time.sleep(random.randint(1, 3)) # 準備時間不同
print(f"{name} 準備完成,等待其他人")
barrier.wait() # 🏁 等待所有人到達
print(f"{name} 開始跑!")
# 創建 3 個跑者
threads = [Thread(target=runner, args=(f'跑者-{i}',)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()輸出:
跑者-0 開始準備
跑者-1 開始準備
跑者-2 開始準備
跑者-1 準備完成,等待其他人
跑者-0 準備完成,等待其他人
跑者-2 準備完成,等待其他人
跑者-2 開始跑! ← 所有人到齊後一起開始
跑者-0 開始跑!
跑者-1 開始跑!實際案例:並行測試
from threading import Thread, Barrier
import time
# 3 個測試 Thread 同時開始
barrier = Barrier(3)
def performance_test(test_id):
print(f"測試 {test_id} 初始化中")
time.sleep(1)
print(f"測試 {test_id} 準備就緒")
barrier.wait() # 🏁 等待所有測試準備好
# 所有測試同時開始
start = time.time()
time.sleep(2) # 模擬測試
elapsed = time.time() - start
print(f"測試 {test_id} 完成,耗時: {elapsed:.2f} 秒")
threads = [Thread(target=performance_test, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()📊 同步機制對比
| 機制 | 用途 | 典型場景 |
|---|---|---|
| Lock | 互斥鎖,一次一個 Thread | 保護共享資源 |
| RLock | 可重入鎖 | 遞迴函數、嵌套調用 |
| Semaphore | 限制並發數量 | 連線池、資源限制 |
| Event | 信號通知 | 啟動通知、狀態廣播 |
| Condition | 複雜等待/通知 | 生產者-消費者 |
| Barrier | 同步點 | 並行測試、分階段任務 |
💡 最佳實踐
1. 始終使用 with 語句
# ❌ 不好:手動管理
lock.acquire()
try:
# 臨界區
pass
finally:
lock.release()
# ✅ 好:自動管理
with lock:
# 臨界區
pass2. 避免死鎖:固定鎖順序
lock1 = Lock()
lock2 = Lock()
# ✅ 所有 Thread 按相同順序獲取鎖
def thread_func():
with lock1: # 先鎖 lock1
with lock2: # 再鎖 lock2
# 臨界區
pass3. 使用 timeout 避免永久阻塞
from threading import Lock
lock = Lock()
if lock.acquire(timeout=5): # 最多等 5 秒
try:
# 臨界區
pass
finally:
lock.release()
else:
print("無法獲取鎖,超時")4. 減小臨界區範圍
# ❌ 不好:臨界區過大
with lock:
data = fetch_data() # 慢
result = process(data) # 慢
save(result)
# ✅ 好:只保護必要部分
data = fetch_data()
result = process(data)
with lock:
save(result) # 只鎖寫入⚠️ 常見錯誤
1. 忘記釋放鎖
# ❌ 錯誤
lock.acquire()
if error:
return # 忘記 release()!
lock.release()
# ✅ 正確
with lock:
if error:
return # with 會自動 release2. 鎖的粒度過粗
# ❌ 不好:整個函數都鎖住
with lock:
# 100 行程式碼...
pass
# ✅ 好:只鎖必要部分
# 前處理
with lock:
# 只鎖關鍵部分
pass
# 後處理3. 在 Lock 內等待另一個 Lock
# ❌ 可能死鎖
with lock1:
with lock2: # 危險!
pass
# ✅ 使用固定順序或避免嵌套✅ 重點回顧
六大同步機制:
- Lock - 基本互斥鎖
- RLock - 可重入鎖
- Semaphore - 限制並發數
- Event - 事件通知
- Condition - 條件變數
- Barrier - 同步屏障
使用原則:
- ✅ 使用
with語句 - ✅ 固定鎖順序
- ✅ 減小臨界區
- ✅ 設置 timeout
- ✅ 避免嵌套鎖
避免問題:
- ❌ Race Condition
- ❌ Deadlock
- ❌ 忘記釋放鎖
- ❌ 鎖粒度過粗
上一篇: 02-3. Thread 的生命週期 下一篇: 02-5. Thread Pool 與實戰應用
最後更新:2025-01-06