目錄
02-3. Thread 的生命週期
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐ (簡單)
🤔 一句話解釋
Thread 從創建到終止會經歷:New → Runnable → Running → (Blocked) → Terminated 五個狀態。
🔄 Thread 生命週期圖
創建 Thread
↓
┌─────────┐
│ New │ ← Thread 已創建,但未啟動
└─────────┘
↓ start()
┌─────────┐
│Runnable │ ← 就緒狀態,等待 CPU
└─────────┘
↓ OS 調度
┌─────────┐
│ Running │ ← 正在執行
└─────────┘
↓ 完成 / sleep() / wait() / I/O
┌─────────┐ ┌─────────┐
│ Blocked │ ←────→ │Runnable │
└─────────┘ └─────────┘
↓ 任務完成
┌─────────┐
│Terminated│ ← Thread 結束
└─────────┘📍 狀態 1:New(新建狀態)
定義:Thread 對象已創建,但尚未呼叫 start() 方法。
from threading import Thread
def worker():
print("Worker 執行中")
# 創建 Thread
t = Thread(target=worker)
print(f"Thread 狀態: {t.is_alive()}") # False
# ✅ Thread 已創建
# ❌ 尚未啟動
# ❌ 不佔用 CPU 資源輸出:
Thread 狀態: False特點:
- Thread 對象已在記憶體中
- 尚未分配系統資源
- 不參與 CPU 調度
📍 狀態 2:Runnable(就緒狀態)
定義:Thread 已呼叫 start() 方法,進入就緒佇列,等待 CPU 調度。
from threading import Thread
import time
def worker(name):
print(f"{name} 開始執行")
time.sleep(1)
print(f"{name} 完成")
# 創建並啟動 Thread
t1 = Thread(target=worker, args=('Thread-1',))
t2 = Thread(target=worker, args=('Thread-2',))
t1.start() # 進入 Runnable 狀態
t2.start() # 進入 Runnable 狀態
print(f"t1 is alive: {t1.is_alive()}") # True
print(f"t2 is alive: {t2.is_alive()}") # True輸出:
t1 is alive: True
t2 is alive: True
Thread-1 開始執行
Thread-2 開始執行
(等待 1 秒...)
Thread-1 完成
Thread-2 完成特點:
- Thread 已準備好執行
- 在就緒佇列(Ready Queue)中等待
- 等待 OS 分配 CPU 時間片
📍 狀態 3:Running(執行狀態)
定義:Thread 獲得 CPU 時間片,正在執行。
from threading import Thread, current_thread
import time
def worker():
# 正在執行的 Thread
print(f"執行中的 Thread: {current_thread().name}")
time.sleep(0.5)
# 創建多個 Thread
threads = [Thread(target=worker, name=f'Worker-{i}') for i in range(3)]
# 啟動所有 Thread
for t in threads:
t.start()
# 主 Thread 也在執行
print(f"主 Thread: {current_thread().name}")
for t in threads:
t.join()輸出:
主 Thread: MainThread
執行中的 Thread: Worker-0
執行中的 Thread: Worker-1
執行中的 Thread: Worker-2特點:
- Thread 正在使用 CPU
- 執行 Thread 的程式碼
- 可能被打斷(Time Slice 用完)
📍 狀態 4:Blocked(阻塞狀態)
定義:Thread 因某些原因無法執行,等待特定條件滿足。
阻塞原因 1:等待 Lock
from threading import Thread, Lock
import time
lock = Lock()
def worker(name):
print(f"{name} 嘗試獲取 Lock")
lock.acquire()
print(f"{name} 獲得 Lock,開始工作")
time.sleep(2) # 持有 Lock 2 秒
print(f"{name} 釋放 Lock")
lock.release()
t1 = Thread(target=worker, args=('Thread-1',))
t2 = Thread(target=worker, args=('Thread-2',))
t1.start()
time.sleep(0.1) # 確保 t1 先獲得 Lock
t2.start() # t2 會被阻塞
t1.join()
t2.join()輸出:
Thread-1 嘗試獲取 Lock
Thread-1 獲得 Lock,開始工作
Thread-2 嘗試獲取 Lock ← Thread-2 阻塞中
(等待 2 秒...)
Thread-1 釋放 Lock
Thread-2 獲得 Lock,開始工作 ← Thread-2 解除阻塞
(等待 2 秒...)
Thread-2 釋放 Lock狀態轉換:
Thread-2: Runnable → 嘗試獲取 Lock → Blocked(等待 Lock)
↓
Lock 釋放
↓
Thread-2: Blocked → Runnable → Running阻塞原因 2:sleep()
from threading import Thread
import time
def worker(name):
print(f"{name} 開始")
print(f"{name} 進入睡眠")
time.sleep(3) # 阻塞 3 秒
print(f"{name} 醒來")
t = Thread(target=worker, args=('Worker',))
t.start()
print(f"is_alive (睡眠中): {t.is_alive()}") # True,但是阻塞中
time.sleep(4)
print(f"is_alive (醒來後): {t.is_alive()}") # False(已完成)輸出:
Worker 開始
Worker 進入睡眠
is_alive (睡眠中): True
(等待 3 秒...)
Worker 醒來
is_alive (醒來後): False狀態轉換:
Running → sleep() → Blocked
↓
睡眠時間到
↓
Blocked → Runnable → Running阻塞原因 3:等待 I/O
from threading import Thread
import time
def read_file():
print("開始讀取檔案")
with open('large_file.txt', 'r') as f: # I/O 操作,阻塞中
content = f.read()
print("檔案讀取完成")
def compute():
print("開始計算")
total = sum(range(10000000))
print(f"計算完成: {total}")
# Thread-1: I/O 阻塞
t1 = Thread(target=read_file)
# Thread-2: CPU 運算
t2 = Thread(target=compute)
t1.start()
t2.start()
t1.join()
t2.join()狀態轉換:
Thread-1 (I/O):
Running → 等待硬碟讀取 → Blocked
↓
資料讀取完成
↓
Blocked → Runnable → Running
Thread-2 (CPU):
Running → 計算中 → Running → Terminated阻塞原因 4:join() 等待另一個 Thread
from threading import Thread
import time
def worker(name, duration):
print(f"{name} 開始工作")
time.sleep(duration)
print(f"{name} 完成")
def coordinator():
print("Coordinator: 創建 Worker")
t = Thread(target=worker, args=('Worker', 3))
t.start()
print("Coordinator: 等待 Worker 完成")
t.join() # ← Coordinator 阻塞在這裡
print("Coordinator: Worker 已完成,繼續執行")
c = Thread(target=coordinator)
c.start()
c.join()輸出:
Coordinator: 創建 Worker
Coordinator: 等待 Worker 完成
Worker 開始工作
(等待 3 秒...)
Worker 完成
Coordinator: Worker 已完成,繼續執行📍 狀態 5:Terminated(終止狀態)
定義:Thread 執行完成或因異常而終止。
正常終止
from threading import Thread
def worker():
print("開始工作")
return "完成" # 正常結束
t = Thread(target=worker)
t.start()
t.join()
print(f"Thread 狀態: {t.is_alive()}") # False輸出:
開始工作
Thread 狀態: False異常終止
from threading import Thread
import time
def risky_worker():
print("開始工作")
time.sleep(1)
raise Exception("發生錯誤!") # 異常
def safe_worker():
for i in range(5):
print(f"安全 Worker: {i}")
time.sleep(0.5)
t1 = Thread(target=risky_worker)
t2 = Thread(target=safe_worker)
t1.start()
t2.start()
t1.join()
t2.join()輸出:
開始工作
安全 Worker: 0
Exception in thread Thread-1:
...
安全 Worker: 1
...注意:
- Thread 異常不會影響其他 Thread
- 但會導致該 Thread 提前終止
🔍 完整生命週期範例
from threading import Thread, current_thread, Lock
import time
lock = Lock()
def lifecycle_demo(name):
print(f"[{name}] 狀態: Runnable(等待 CPU)")
# Running
print(f"[{name}] 狀態: Running(開始執行)")
# Blocked - sleep
print(f"[{name}] 狀態: Blocked(睡眠 1 秒)")
time.sleep(1)
# Running again
print(f"[{name}] 狀態: Running(睡眠結束)")
# Blocked - waiting for Lock
print(f"[{name}] 狀態: Blocked(等待 Lock)")
with lock:
print(f"[{name}] 狀態: Running(獲得 Lock)")
time.sleep(0.5)
# Terminated
print(f"[{name}] 狀態: Terminated(結束)")
# 創建 Thread(New)
t1 = Thread(target=lifecycle_demo, args=('Thread-1',))
t2 = Thread(target=lifecycle_demo, args=('Thread-2',))
print("=== Thread 創建完成(New 狀態)===")
# 啟動 Thread(Runnable)
t1.start()
t2.start()
print("=== Thread 已啟動(Runnable 狀態)===")
# 等待完成(主 Thread 阻塞)
t1.join()
t2.join()
print("=== 所有 Thread 已終止 ===")輸出:
=== Thread 創建完成(New 狀態)===
=== Thread 已啟動(Runnable 狀態)===
[Thread-1] 狀態: Runnable(等待 CPU)
[Thread-1] 狀態: Running(開始執行)
[Thread-1] 狀態: Blocked(睡眠 1 秒)
[Thread-2] 狀態: Runnable(等待 CPU)
[Thread-2] 狀態: Running(開始執行)
[Thread-2] 狀態: Blocked(睡眠 1 秒)
[Thread-1] 狀態: Running(睡眠結束)
[Thread-1] 狀態: Blocked(等待 Lock)
[Thread-1] 狀態: Running(獲得 Lock)
[Thread-2] 狀態: Running(睡眠結束)
[Thread-2] 狀態: Blocked(等待 Lock)
[Thread-1] 狀態: Terminated(結束)
[Thread-2] 狀態: Running(獲得 Lock)
[Thread-2] 狀態: Terminated(結束)
=== 所有 Thread 已終止 ===📊 狀態轉換總結
┌──────────────────────────────────────────┐
│ Thread 生命週期 │
├──────────────────────────────────────────┤
│ New │
│ ↓ start() │
│ Runnable ←───────────────┐ │
│ ↓ OS 調度 │ │
│ Running │ │
│ ↓ │ │
│ ├─ sleep() ─────→ Blocked │
│ ├─ wait() ─────→ Blocked │
│ ├─ acquire Lock ─────→ Blocked │
│ ├─ I/O 操作 ─────→ Blocked │
│ │ ↓ │
│ │ 解除阻塞 ────────────┘
│ ↓ │
│ Terminated │
└──────────────────────────────────────────┘🎯 狀態檢查方法
1. is_alive()
from threading import Thread
import time
def worker():
time.sleep(2)
t = Thread(target=worker)
# New
print(f"New: {t.is_alive()}") # False
# Runnable/Running
t.start()
print(f"Runnable/Running: {t.is_alive()}") # True
# Blocked
time.sleep(1)
print(f"Blocked (sleep): {t.is_alive()}") # True
# Terminated
t.join()
print(f"Terminated: {t.is_alive()}") # False2. enumerate() 查看所有活動 Thread
from threading import Thread, enumerate as thread_enumerate
import time
def worker(name):
print(f"{name} 開始")
time.sleep(3)
print(f"{name} 結束")
# 創建多個 Thread
threads = [Thread(target=worker, args=(f'Worker-{i}',)) for i in range(3)]
# 啟動
for t in threads:
t.start()
# 查看所有活動 Thread
time.sleep(1)
print("\n=== 活動的 Thread ===")
for t in thread_enumerate():
print(f" - {t.name} (alive: {t.is_alive()})")
# 等待完成
for t in threads:
t.join()
print("\n=== 完成後的 Thread ===")
for t in thread_enumerate():
print(f" - {t.name} (alive: {t.is_alive()})")輸出:
Worker-0 開始
Worker-1 開始
Worker-2 開始
=== 活動的 Thread ===
- MainThread (alive: True)
- Worker-0 (alive: True)
- Worker-1 (alive: True)
- Worker-2 (alive: True)
Worker-0 結束
Worker-1 結束
Worker-2 結束
=== 完成後的 Thread ===
- MainThread (alive: True)💡 實用技巧
1. Daemon Thread(守護線程)
from threading import Thread
import time
def background_task():
count = 0
while True:
print(f"背景任務: {count}")
count += 1
time.sleep(1)
# 設為 Daemon Thread
t = Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
print("主 Thread 結束")
# Daemon Thread 會自動終止特點:
- 主 Thread 結束時,Daemon Thread 自動終止
- 不會阻止程式退出
- 適合背景監控、日誌等任務
2. Thread 超時控制
from threading import Thread
import time
def long_task():
print("開始長時間任務")
time.sleep(10)
print("任務完成")
t = Thread(target=long_task)
t.start()
# 最多等待 3 秒
t.join(timeout=3)
if t.is_alive():
print("⏱️ Thread 超時,仍在執行")
else:
print("✅ Thread 已完成")輸出:
開始長時間任務
⏱️ Thread 超時,仍在執行✅ 重點回顧
Thread 的五個狀態:
- New - 已創建,未啟動
- Runnable - 就緒,等待 CPU
- Running - 正在執行
- Blocked - 阻塞(等待 Lock、I/O、sleep)
- Terminated - 已終止
狀態轉換:
start()→ New → Runnable- OS 調度 → Runnable → Running
- Lock/I/O/sleep → Running → Blocked
- 解除阻塞 → Blocked → Runnable
- 完成/異常 → Running → Terminated
關鍵方法:
start()- 啟動 Threadjoin()- 等待 Thread 完成is_alive()- 檢查 Thread 狀態daemon=True- 設為守護線程
上一篇: 02-2. Thread 的優缺點 下一篇: 02-4. Thread 同步機制
最後更新:2025-01-06