02-7. 何時用 Thread?何時用 Process?決策指南

⏱️ 閱讀時間: 10 分鐘 🎯 難度: ⭐⭐ (簡單)


🎯 本篇重點

提供清晰的決策樹和實戰案例,幫助你快速判斷何時使用 Thread、何時使用 Process。


🤔 快速決策流程圖

開始:我需要併發處理
    ↓
┌──────────────────────────┐
│ 任務類型是什麼?          │
└──────────────────────────┘
    ↓
    ├─ I/O 密集型(網路、檔案、資料庫)
    │   → 使用 Multi-threading
    │      └─ 需要大量併發(>1000)?
    │          ├─ 是 → 使用 asyncio
    │          └─ 否 → 使用 threading
    │
    └─ CPU 密集型(計算、資料處理)
        → 使用 Multi-processing
           └─ 需要數據共享?
               ├─ 否 → 使用 Pool
               └─ 是 → 使用 Process + Manager/Value

📊 決策矩陣

情境 1:任務類型

任務類型推薦方案原因
網路請求ThreadI/O 等待時釋放 GIL
檔案讀寫ThreadI/O 操作
資料庫查詢ThreadI/O 等待
數學計算Process需要 CPU 並行
影像處理ProcessCPU 密集
資料分析ProcessCPU 密集
大量併發 I/Oasyncio最高效

情境 2:程式特性

需求ThreadProcess
需要共享大量資料✅ 推薦❌ 成本高
需要完全隔離❌ 不行✅ 推薦
需要快速創建✅ 推薦❌ 較慢
需要穩定性❌ 一崩全崩✅ 隔離
需要利用多核❌ GIL 限制✅ 可以
數量需要很多✅ 可以❌ 受限

🔍 實戰案例分析

案例 1:網路爬蟲

需求:

  • 爬取 1000 個網頁
  • 每個請求需要 1-2 秒
  • 需要解析 HTML

分析:

# ✅ 推薦:Multi-threading
from threading import Thread
import requests

def crawl(url):
    response = requests.get(url)  # I/O 密集
    # 簡單的 HTML 解析(不太耗 CPU)
    return parse_html(response.text)

# Thread 方案
threads = [Thread(target=crawl, args=(url,)) for url in urls[:1000]]
for t in threads[:50]:  # 每次 50 個並發
    t.start()

為什麼不用 Process?

  • 創建 1000 個 Process 成本太高
  • 網路請求是 I/O 等待,Thread 足夠

為什麼不用 asyncio?

  • 如果使用 requests,無法用 asyncio
  • 若用 aiohttp,asyncio 更佳

最佳方案:

# 🏆 最佳:Thread Pool
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=50) as executor:
    results = executor.map(crawl, urls)

案例 2:影像批量處理

需求:

  • 處理 10000 張圖片
  • 每張需要壓縮、濾鏡、轉檔
  • 單張處理需要 0.5 秒(CPU 密集)

分析:

# ✅ 推薦:Multi-processing
from multiprocessing import Pool
from PIL import Image

def process_image(image_path):
    img = Image.open(image_path)
    # CPU 密集運算
    img = img.resize((800, 600))
    img = apply_filter(img)
    img.save(output_path, quality=85)

# Process Pool 方案
with Pool(processes=8) as pool:  # 使用 8 核 CPU
    pool.map(process_image, image_paths)

為什麼不用 Thread?

  • 影像處理是 CPU 密集
  • GIL 限制,Thread 無法並行
  • 測試結果:Thread 10 秒 vs Process 2 秒

成本對比:

# ❌ Thread 版本(慢)
# 10000 張 × 0.5 秒 ÷ 1 核 = 5000 秒

# ✅ Process 版本(快)
# 10000 張 × 0.5 秒 ÷ 8 核 = 625 秒

案例 3:Web API 伺服器

需求:

  • 處理 HTTP 請求
  • 查詢資料庫
  • 返回 JSON 結果
  • 需要高併發

分析:

# 🏆 混合方案:Process + Thread

# Gunicorn 配置
gunicorn app:app \
    --workers 4 \          # 4 個 Process(利用 4 核 CPU)
    --threads 10 \         # 每個 Process 10 個 Thread
    --worker-class gthread

架構圖:

Gunicorn Master
├─ Worker Process 1 (CPU core 1)
│  ├─ Thread 1 → 處理請求 A (查詢資料庫)
│  ├─ Thread 2 → 處理請求 B (查詢資料庫)
│  └─ ... (10 threads)
│
├─ Worker Process 2 (CPU core 2)
│  └─ ... (10 threads)
│
├─ Worker Process 3 (CPU core 3)
│  └─ ... (10 threads)
│
└─ Worker Process 4 (CPU core 4)
   └─ ... (10 threads)

總共:4 Process × 10 Thread = 40 個併發連線

為什麼用 Process?

  • 利用多核 CPU
  • 隔離性:一個 Worker 崩潰不影響其他

為什麼用 Thread?

  • 資料庫查詢是 I/O 等待
  • Thread 可以在 I/O 期間處理其他請求

案例 4:機器學習模型訓練

需求:

  • 訓練多個 ML 模型
  • 每個模型需要大量 CPU 計算
  • 模型之間不需要通訊

分析:

# ✅ 推薦:Multi-processing
from multiprocessing import Process
import numpy as np
from sklearn.ensemble import RandomForestClassifier

def train_model(data, params):
    X, y = data
    model = RandomForestClassifier(**params)
    model.fit(X, y)  # CPU 密集
    return model

# Process 方案
processes = []
for params in model_params:
    p = Process(target=train_model, args=(data, params))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

為什麼不用 Thread?

  • NumPy/scikit-learn 的部分操作釋放 GIL
  • 但 Python 層面的計算仍受 GIL 限制
  • Process 能保證完全並行

實測數據:

4 個模型訓練:
- 單執行:400 秒
- Thread:380 秒(僅快 5%)
- Process:110 秒(快 3.6 倍)

案例 5:即時聊天伺服器

需求:

  • 處理 10000+ 個連線
  • 每個連線需要保持 WebSocket
  • 轉發訊息給其他用戶

分析:

# 🏆 最佳:asyncio
import asyncio
import websockets

async def handle_client(websocket, path):
    async for message in websocket:
        # 轉發給其他用戶(I/O 操作)
        await broadcast(message)

# 單 Thread,處理 10000+ 連線
asyncio.run(websockets.serve(handle_client, 'localhost', 8765))

為什麼不用 Thread?

# ❌ Thread 方案(不可行)
# 10000 個連線 = 10000 個 Thread
# 記憶體:10000 × 8MB = 80GB(系統崩潰)

為什麼不用 Process?

# ❌ Process 方案(更不可行)
# 10000 個連線 = 10000 個 Process
# 完全無法實現

為什麼用 asyncio?

  • 單 Thread 可處理 10000+ 連線
  • 記憶體:僅幾 MB
  • I/O 多路復用(epoll/kqueue)

🎯 詳細決策表

按任務類型選擇

任務類型特徵ThreadProcessasyncio
HTTP 請求I/O 密集✅ 推薦❌ 成本高🏆 最佳
檔案讀寫I/O 密集✅ 推薦❌ 成本高✅ 可以
資料庫查詢I/O 密集✅ 推薦❌ 成本高✅ 可以
數學計算CPU 密集❌ 無效🏆 最佳❌ 無效
影像處理CPU 密集❌ 無效🏆 最佳❌ 無效
影片轉檔CPU 密集❌ 無效🏆 最佳❌ 無效
大量 WebSocketI/O 密集❌ 太多❌ 不可行🏆 最佳

按數量規模選擇

併發數量ThreadProcessasyncio
1-10✅ 可以✅ 可以✅ 可以
10-100✅ 推薦✅ 推薦✅ 推薦
100-1000✅ 推薦⚠️ 謹慎✅ 推薦
1000-10000⚠️ 謹慎❌ 不可行🏆 最佳
10000+❌ 不可行❌ 不可行🏆 唯一選擇

⚠️ 特殊情況處理

情況 1:需要共享大量資料

場景:

  • 多個任務需要存取同一個大型資料集(1GB+)
  • 資料只讀或很少修改

選擇:

# ✅ Thread(共享記憶體,無需複製)
from threading import Thread

large_dataset = load_1gb_data()  # 載入一次

def worker(task_id):
    # 所有 Thread 直接存取 large_dataset
    result = process(large_dataset[task_id])
    return result

threads = [Thread(target=worker, args=(i,)) for i in range(100)]

為什麼不用 Process?

# ❌ Process 需要複製資料
# 100 個 Process × 1GB = 100GB 記憶體(爆炸)

# ⚠️ 除非使用 shared_memory(複雜)
from multiprocessing import shared_memory
# 需要額外處理,複雜度高

情況 2:需要絕對穩定性

場景:

  • 金融交易系統
  • 醫療設備控制
  • 飛機飛行控制

選擇:

# ✅ Process(完全隔離)
from multiprocessing import Process

def critical_task(task_data):
    try:
        result = execute_critical_operation(task_data)
        return result
    except Exception as e:
        # 錯誤被隔離在這個 Process
        log_error(e)

# 每個任務獨立 Process
processes = [Process(target=critical_task, args=(data,)) for data in tasks]

為什麼不用 Thread?

# ❌ Thread:一個崩潰可能影響全部
# 金融系統不能接受這種風險

情況 3:需要動態擴展

場景:

  • 任務數量不確定
  • 需要根據負載自動調整
  • 需要快速創建/銷毀

選擇:

# ✅ Thread Pool(動態管理)
from concurrent.futures import ThreadPoolExecutor

def dynamic_worker(task):
    # 處理任務
    return result

# 自動管理 Thread 生命週期
with ThreadPoolExecutor(max_workers=50) as executor:
    # 動態提交任務
    futures = []
    for task in incoming_tasks():
        future = executor.submit(dynamic_worker, task)
        futures.append(future)

    # 等待結果
    for future in futures:
        result = future.result()

📋 完整選擇清單

✅ 選擇 Thread 的理由

  • 任務是 I/O 密集型
  • 需要共享大量資料
  • 需要快速創建/銷毀
  • 併發數量 < 1000
  • 需要簡單的通訊
  • 記憶體有限

✅ 選擇 Process 的理由

  • 任務是 CPU 密集型
  • 需要利用多核 CPU
  • 需要完全隔離
  • 需要繞過 GIL
  • 穩定性要求高
  • 數量適中(< 100)

✅ 選擇 asyncio 的理由

  • 需要大量併發(> 1000)
  • 任務是 I/O 密集型
  • 需要最低記憶體佔用
  • 需要精確控制事件循環
  • 使用支援 async/await 的庫

🎯 實用決策代碼

import os
from multiprocessing import cpu_count

def choose_concurrency_model(task_type, num_tasks, data_size_mb):
    """
    自動選擇併發模型

    Args:
        task_type: 'io' or 'cpu'
        num_tasks: 任務數量
        data_size_mb: 共享資料大小(MB)

    Returns:
        'thread', 'process', or 'asyncio'
    """
    if task_type == 'cpu':
        # CPU 密集型一律用 Process
        return 'process'

    elif task_type == 'io':
        if num_tasks > 1000:
            # 大量 I/O 用 asyncio
            return 'asyncio'
        elif data_size_mb > 100:
            # 大型共享資料用 Thread
            return 'thread'
        else:
            # 預設用 Thread
            return 'thread'

    else:
        raise ValueError("task_type must be 'io' or 'cpu'")

# 使用範例
model = choose_concurrency_model('io', 500, 50)
print(f"推薦使用: {model}")
# 輸出:推薦使用: thread

model = choose_concurrency_model('cpu', 8, 10)
print(f"推薦使用: {model}")
# 輸出:推薦使用: process

model = choose_concurrency_model('io', 5000, 1)
print(f"推薦使用: {model}")
# 輸出:推薦使用: asyncio

✅ 重點回顧

快速決策:

  1. I/O 密集型 → Thread(或 asyncio)
  2. CPU 密集型 → Process
  3. 大量併發 → asyncio
  4. 需要共享資料 → Thread
  5. 需要隔離 → Process

常見組合:

  • Web 伺服器:Process + Thread(Gunicorn)
  • 爬蟲:Thread Pool 或 asyncio
  • 資料分析:Process Pool
  • 即時通訊:asyncio
  • 批量處理:Process Pool

記住:

  • ✅ 先分析任務類型(I/O vs CPU)
  • ✅ 再考慮數量規模
  • ✅ 最後考慮特殊需求
  • ✅ 不確定就先用 Thread 試試

上一篇: 02-6. Multi-threading vs Multi-processing 完整對比 下一篇: 03-1. IPC 概述


最後更新:2025-01-06

0%