目錄
02-5. Thread Pool 與實戰應用
⏱️ 閱讀時間: 18 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🤔 一句話解釋
Thread Pool 是預先創建的 Thread 集合,重複使用來執行多個任務,避免頻繁創建/銷毀 Thread 的開銷。
❌ 沒有 Thread Pool 的問題
問題 1:頻繁創建/銷毀 Thread
import time
from threading import Thread
def task(task_id):
time.sleep(0.1)
return f"Task {task_id} 完成"
# 處理 100 個任務,每個任務創建一個 Thread
start = time.time()
threads = []
for i in range(100):
t = Thread(target=task, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"時間: {time.time() - start:.2f} 秒")
# 輸出:時間: 0.25 秒
# ⚠️ 問題:
# 1. 創建 100 個 Thread 有開銷
# 2. 100 個 Thread 同時執行,資源競爭
# 3. 無法控制並發數量問題 2:資源耗盡
# ❌ 危險:創建過多 Thread
threads = [Thread(target=task, args=(i,)) for i in range(10000)]
for t in threads:
t.start()
# 💥 可能導致:
# - 記憶體耗盡
# - CPU 過載
# - 系統不穩定✅ Thread Pool 的優勢
傳統方式:
創建 Thread → 執行任務 → 銷毀 Thread
創建 Thread → 執行任務 → 銷毀 Thread
創建 Thread → 執行任務 → 銷毀 Thread
⏱️ 開銷大!
Thread Pool:
[Thread 1] ↘
[Thread 2] → 任務佇列 → 重複使用
[Thread 3] ↗
⏱️ 開銷小!優勢:
- ✅ 避免頻繁創建/銷毀
- ✅ 控制並發數量
- ✅ 重複使用 Thread
- ✅ 更好的資源管理
🏊 ThreadPoolExecutor 基本用法
1. 創建 Thread Pool
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
print(f"執行任務 {n}")
time.sleep(1)
return n * 2
# 創建 Thread Pool:最多 3 個 Thread
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 3)
# 獲取結果
print(f"結果 1: {future1.result()}")
print(f"結果 2: {future2.result()}")
print(f"結果 3: {future3.result()}")
# with 結束時自動等待所有任務完成輸出:
執行任務 1
執行任務 2
執行任務 3
(等待 1 秒...)
結果 1: 2
結果 2: 4
結果 3: 62. map() 方法:批次處理
from concurrent.futures import ThreadPoolExecutor
import time
def square(n):
time.sleep(0.5)
return n * n
# 批次處理 10 個數字
numbers = range(10)
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(square, numbers)
# 結果按順序返回
for num, result in zip(numbers, results):
print(f"{num}² = {result}")輸出:
0² = 0
1² = 1
2² = 4
3² = 9
...3. submit() vs map()
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * 2
with ThreadPoolExecutor(max_workers=3) as executor:
# 方法 1: submit(獲取 Future 對象)
future = executor.submit(task, 5)
result = future.result() # 10
# 方法 2: map(批次處理)
results = executor.map(task, [1, 2, 3, 4, 5])
print(list(results)) # [2, 4, 6, 8, 10]對比:
| 方法 | 特點 | 使用場景 |
|---|---|---|
submit() | 返回 Future,可逐個處理 | 任務不同、需要個別控制 |
map() | 批次處理,結果有序 | 相同任務、不同輸入 |
📥 Future 對象詳解
1. 檢查狀態
from concurrent.futures import ThreadPoolExecutor
import time
def slow_task():
time.sleep(3)
return "完成"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
print(f"running: {future.running()}") # True
print(f"done: {future.done()}") # False
time.sleep(1)
print(f"running: {future.running()}") # True
print(f"done: {future.done()}") # False
result = future.result() # 等待完成
print(f"done: {future.done()}") # True
print(f"結果: {result}")2. 設置 timeout
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task():
time.sleep(5)
return "完成"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
result = future.result(timeout=2) # 最多等 2 秒
except TimeoutError:
print("⏱️ 任務超時")輸出:
⏱️ 任務超時3. 異常處理
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n == 0:
raise ValueError("n 不能為 0")
return 10 / n
with ThreadPoolExecutor() as executor:
future1 = executor.submit(risky_task, 5)
future2 = executor.submit(risky_task, 0) # 會失敗
# 正常任務
print(f"結果 1: {future1.result()}") # 2.0
# 異常任務
try:
print(f"結果 2: {future2.result()}")
except ValueError as e:
print(f"❌ 錯誤: {e}")輸出:
結果 1: 2.0
❌ 錯誤: n 不能為 04. as_completed():先完成先處理
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def task(name):
duration = random.randint(1, 3)
print(f"{name} 開始 (預計 {duration} 秒)")
time.sleep(duration)
return f"{name} 完成 (耗時 {duration} 秒)"
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務
futures = [executor.submit(task, f'Task-{i}') for i in range(5)]
# 按完成順序處理
for future in as_completed(futures):
result = future.result()
print(f"✅ {result}")輸出:
Task-0 開始 (預計 2 秒)
Task-1 開始 (預計 1 秒)
Task-2 開始 (預計 3 秒)
✅ Task-1 完成 (耗時 1 秒) ← 先完成
Task-3 開始 (預計 2 秒)
✅ Task-0 完成 (耗時 2 秒)
✅ Task-3 完成 (耗時 2 秒)
Task-4 開始 (預計 1 秒)
✅ Task-4 完成 (耗時 1 秒)
✅ Task-2 完成 (耗時 3 秒) ← 最後完成🌐 實戰案例 1:並發下載檔案
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def download_file(url):
"""下載單個檔案"""
filename = url.split('/')[-1]
print(f"下載 {filename}...")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
with open(filename, 'wb') as f:
f.write(response.content)
return f"✅ {filename} 下載成功 ({len(response.content)} bytes)"
except Exception as e:
return f"❌ {filename} 下載失敗: {e}"
# 要下載的檔案列表
urls = [
'https://example.com/file1.pdf',
'https://example.com/file2.pdf',
'https://example.com/file3.pdf',
'https://example.com/file4.pdf',
'https://example.com/file5.pdf',
]
# 使用 Thread Pool 並發下載
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有下載任務
futures = [executor.submit(download_file, url) for url in urls]
# 按完成順序處理結果
for future in as_completed(futures):
result = future.result()
print(result)
print(f"\n總耗時: {time.time() - start:.2f} 秒")輸出:
下載 file1.pdf...
下載 file2.pdf...
下載 file3.pdf...
下載 file4.pdf...
下載 file5.pdf...
✅ file2.pdf 下載成功 (1024000 bytes)
✅ file1.pdf 下載成功 (2048000 bytes)
✅ file4.pdf 下載成功 (512000 bytes)
✅ file3.pdf 下載成功 (3072000 bytes)
✅ file5.pdf 下載成功 (768000 bytes)
總耗時: 2.34 秒🔍 實戰案例 2:批次 API 請求
from concurrent.futures import ThreadPoolExecutor
import requests
import time
def fetch_user(user_id):
"""獲取單個用戶資料"""
url = f"https://api.example.com/users/{user_id}"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
return {
'user_id': user_id,
'name': data.get('name'),
'status': 'success'
}
except Exception as e:
return {
'user_id': user_id,
'error': str(e),
'status': 'failed'
}
# 批次處理 100 個用戶
user_ids = range(1, 101)
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
# 使用 map 批次處理
results = executor.map(fetch_user, user_ids)
# 統計結果
success_count = 0
failed_count = 0
for result in results:
if result['status'] == 'success':
success_count += 1
print(f"✅ User {result['user_id']}: {result['name']}")
else:
failed_count += 1
print(f"❌ User {result['user_id']}: {result['error']}")
print(f"\n成功: {success_count}, 失敗: {failed_count}")
print(f"總耗時: {time.time() - start:.2f} 秒")🗃️ 實戰案例 3:資料庫批次查詢
from concurrent.futures import ThreadPoolExecutor
import sqlite3
import time
def query_database(query_id, keyword):
"""執行資料庫查詢"""
# 每個 Thread 需要自己的連線
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
cursor.execute(
"SELECT * FROM products WHERE name LIKE ?",
(f'%{keyword}%',)
)
results = cursor.fetchall()
return {
'query_id': query_id,
'keyword': keyword,
'count': len(results),
'results': results[:5] # 只返回前 5 筆
}
finally:
conn.close()
# 批次查詢
keywords = ['apple', 'banana', 'cherry', 'date', 'elderberry']
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(query_database, i, keyword)
for i, keyword in enumerate(keywords)
]
for future in futures:
result = future.result()
print(f"查詢 {result['query_id']} ({result['keyword']}): "
f"找到 {result['count']} 筆")🖼️ 實戰案例 4:圖片處理
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import os
def resize_image(input_path, output_dir, size=(800, 600)):
"""調整圖片大小"""
try:
filename = os.path.basename(input_path)
output_path = os.path.join(output_dir, f"resized_{filename}")
# 開啟圖片
img = Image.open(input_path)
# 調整大小
img.thumbnail(size)
# 儲存
img.save(output_path)
return f"✅ {filename} 處理完成"
except Exception as e:
return f"❌ {filename} 處理失敗: {e}"
# 獲取所有圖片
image_dir = "./images"
output_dir = "./resized_images"
os.makedirs(output_dir, exist_ok=True)
image_files = [
os.path.join(image_dir, f)
for f in os.listdir(image_dir)
if f.endswith(('.jpg', '.png', '.jpeg'))
]
print(f"找到 {len(image_files)} 張圖片")
# 並發處理
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(resize_image, img_path, output_dir)
for img_path in image_files
]
for future in futures:
print(future.result())📊 實戰案例 5:網站健康檢查
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def check_website(url):
"""檢查網站狀態"""
start = time.time()
try:
response = requests.get(url, timeout=5)
elapsed = time.time() - start
return {
'url': url,
'status': response.status_code,
'response_time': elapsed,
'ok': response.status_code == 200
}
except Exception as e:
elapsed = time.time() - start
return {
'url': url,
'status': None,
'response_time': elapsed,
'ok': False,
'error': str(e)
}
# 要檢查的網站列表
websites = [
'https://google.com',
'https://github.com',
'https://stackoverflow.com',
'https://reddit.com',
'https://twitter.com',
]
print("開始健康檢查...\n")
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(check_website, url) for url in websites]
for future in as_completed(futures):
result = future.result()
status = "✅" if result['ok'] else "❌"
print(f"{status} {result['url']}")
print(f" 狀態碼: {result.get('status', 'N/A')}")
print(f" 響應時間: {result['response_time']:.2f} 秒")
if 'error' in result:
print(f" 錯誤: {result['error']}")
print()🎯 Thread Pool 最佳實踐
1. 選擇合適的 max_workers
import os
from concurrent.futures import ThreadPoolExecutor
# ❌ 不好:過多 Thread
with ThreadPoolExecutor(max_workers=1000) as executor:
pass
# ✅ 好:根據任務類型決定
# I/O 密集:CPU 核心數的 2-4 倍
max_workers = os.cpu_count() * 2
# CPU 密集:使用 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
pass2. 使用 context manager
# ❌ 不好:手動管理
executor = ThreadPoolExecutor(max_workers=5)
executor.submit(task)
executor.shutdown(wait=True)
# ✅ 好:自動管理
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(task)
# 自動 shutdown3. 適當的異常處理
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n < 0:
raise ValueError(f"無效數字: {n}")
return n * 2
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, n) for n in range(-2, 3)]
for future in futures:
try:
result = future.result()
print(f"結果: {result}")
except Exception as e:
print(f"錯誤: {e}")
# 記錄錯誤、重試、或其他處理4. 監控進度
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(task_id):
time.sleep(1)
return f"Task {task_id} 完成"
tasks = range(20)
total = len(tasks)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in tasks]
completed = 0
for future in as_completed(futures):
completed += 1
progress = (completed / total) * 100
print(f"進度: {progress:.1f}% ({completed}/{total})")
result = future.result()
print(f" {result}")⚡ 性能對比
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
def task(n):
time.sleep(0.01)
return n * 2
tasks = range(100)
# 方法 1: 逐個執行
start = time.time()
results = [task(n) for n in tasks]
print(f"順序執行: {time.time() - start:.2f} 秒")
# 方法 2: 手動創建 Thread
start = time.time()
threads = [Thread(target=task, args=(n,)) for n in tasks]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"手動 Thread: {time.time() - start:.2f} 秒")
# 方法 3: Thread Pool
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(task, tasks))
print(f"Thread Pool: {time.time() - start:.2f} 秒")輸出:
順序執行: 1.05 秒
手動 Thread: 0.15 秒
Thread Pool: 0.12 秒 ← 最快!🆚 ThreadPoolExecutor vs ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def cpu_task(n):
"""CPU 密集型任務"""
total = 0
for i in range(10000000):
total += i
return total
def io_task(n):
"""I/O 密集型任務"""
time.sleep(1)
return n
# CPU 密集:Process Pool 更快
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, range(4)))
print(f"Process Pool (CPU): {time.time() - start:.2f} 秒")
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, range(4)))
print(f"Thread Pool (CPU): {time.time() - start:.2f} 秒")
# I/O 密集:Thread Pool 更快
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(io_task, range(4)))
print(f"Thread Pool (I/O): {time.time() - start:.2f} 秒")輸出:
Process Pool (CPU): 0.65 秒 ← 快
Thread Pool (CPU): 2.50 秒 ← 慢(GIL)
Thread Pool (I/O): 1.02 秒 ← 快✅ 重點回顧
Thread Pool 的優勢:
- ✅ 避免頻繁創建/銷毀 Thread
- ✅ 控制並發數量
- ✅ 重複使用 Thread
- ✅ 簡化程式碼
ThreadPoolExecutor 的使用:
submit()- 提交單個任務,返回 Futuremap()- 批次處理,結果有序as_completed()- 按完成順序處理
Future 對象:
result()- 獲取結果(阻塞)done()- 檢查是否完成running()- 檢查是否執行中exception()- 獲取異常
適用場景:
- ✅ I/O 密集型任務(網路請求、檔案操作)
- ✅ 大量短期任務
- ❌ CPU 密集型任務(使用 ProcessPoolExecutor)
最佳實踐:
- 使用
with語句 - 合理設置
max_workers - 適當的異常處理
- 監控任務進度
上一篇: 02-4. Thread 同步機制 下一篇: 03-1. 並發模型概述
最後更新:2025-01-06