05-4. concurrent.futures 使用指南

⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐ (簡單)


🎯 本篇重點

掌握 Python concurrent.futures 模組,這是一個高層級的併發介面,統一了 Thread 和 Process 的使用方式。


🤔 為什麼需要 concurrent.futures?

傳統方式的問題

# threading:手動管理 Thread
from threading import Thread
threads = []
for task in tasks:
    t = Thread(target=work, args=(task,))
    t.start()
    threads.append(t)
for t in threads:
    t.join()

# multiprocessing:手動管理 Process
from multiprocessing import Process
processes = []
for task in tasks:
    p = Process(target=work, args=(task,))
    p.start()
    processes.append(p)
for p in processes:
    p.join()

concurrent.futures 的優勢

# 統一的介面!只需換一個 Executor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Thread 版本
with ThreadPoolExecutor() as executor:
    results = executor.map(work, tasks)

# Process 版本(只改一行)
with ProcessPoolExecutor() as executor:
    results = executor.map(work, tasks)

優點:

  • ✅ 統一的 API
  • ✅ 自動管理 Pool
  • ✅ 更容易獲取結果
  • ✅ 更好的錯誤處理

1️⃣ ThreadPoolExecutor 基礎

基本用法

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    """任務函式"""
    print(f"任務 {n} 開始")
    time.sleep(1)
    return n * n

# 使用 Thread Pool
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任務並獲取 Future 物件
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 3)

    # 獲取結果
    print(f"結果 1: {future1.result()}")  # 阻塞直到完成
    print(f"結果 2: {future2.result()}")
    print(f"結果 3: {future3.result()}")

# with 自動關閉 executor,等待所有任務完成

submit() 方法

from concurrent.futures import ThreadPoolExecutor
import time

def download(url):
    """下載檔案"""
    print(f"下載: {url}")
    time.sleep(2)
    return f"{url} 完成"

with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交多個任務
    futures = []
    urls = [f'https://example.com/file{i}' for i in range(10)]

    for url in urls:
        future = executor.submit(download, url)
        futures.append(future)

    # 獲取結果
    for future in futures:
        result = future.result(timeout=5)  # 最多等 5 秒
        print(result)

map() 方法

from concurrent.futures import ThreadPoolExecutor
import time

def square(x):
    time.sleep(0.5)
    return x * x

with ThreadPoolExecutor(max_workers=4) as executor:
    # map 會保持順序
    numbers = range(10)
    results = executor.map(square, numbers)

    # 結果按原始順序返回
    for num, result in zip(numbers, results):
        print(f"{num}^2 = {result}")

# 輸出:
# 0^2 = 0
# 1^2 = 1
# 2^2 = 4
# ...(順序不變)

map() vs submit()

from concurrent.futures import ThreadPoolExecutor

def task(x):
    return x * 2

with ThreadPoolExecutor() as executor:
    # map:批量提交,保持順序
    results1 = list(executor.map(task, [1, 2, 3]))
    print(results1)  # [2, 4, 6](有序)

    # submit:逐個提交,可自訂順序
    futures = [executor.submit(task, x) for x in [1, 2, 3]]
    results2 = [f.result() for f in futures]
    print(results2)  # [2, 4, 6]

選擇:

  • map():任務相同,批量處理,需要順序
  • submit():任務不同,需要靈活控制

2️⃣ ProcessPoolExecutor 基礎

基本用法

from concurrent.futures import ProcessPoolExecutor
import os

def cpu_task(n):
    """CPU 密集任務"""
    print(f"任務 {n}, PID: {os.getpid()}")
    total = 0
    for i in range(n * 1000000):
        total += i * i
    return total

if __name__ == '__main__':
    # 使用 Process Pool
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [1, 2, 3, 4, 5]
        results = executor.map(cpu_task, tasks)

        for task, result in zip(tasks, results):
            print(f"任務 {task} 結果: {result}")

輸出:

任務 1, PID: 1001
任務 2, PID: 1002
任務 3, PID: 1003
任務 4, PID: 1004
任務 5, PID: 1001  ← Process Pool 重複使用
...

Thread vs Process 性能對比

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def cpu_bound_task(n):
    """CPU 密集任務"""
    return sum(i * i for i in range(n))

def io_bound_task(n):
    """I/O 密集任務"""
    time.sleep(1)
    return n

if __name__ == '__main__':
    tasks = [10000000] * 4

    # Thread Pool(CPU 密集)
    start = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_bound_task, tasks))
    print(f"ThreadPool (CPU): {time.time() - start:.2f}s")
    # 輸出:10.0s(無加速)

    # Process Pool(CPU 密集)
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_bound_task, tasks))
    print(f"ProcessPool (CPU): {time.time() - start:.2f}s")
    # 輸出:2.5s(4 倍加速)

    # Thread Pool(I/O 密集)
    tasks_io = [1] * 10
    start = time.time()
    with ThreadPoolExecutor(max_workers=10) as executor:
        list(executor.map(io_bound_task, tasks_io))
    print(f"ThreadPool (I/O): {time.time() - start:.2f}s")
    # 輸出:1.0s(有加速)

3️⃣ Future 物件

Future 的方法

from concurrent.futures import ThreadPoolExecutor
import time

def slow_task(n):
    time.sleep(n)
    return n * 10

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task, 3)

    # 檢查狀態
    print(f"Running: {future.running()}")  # True
    print(f"Done: {future.done()}")        # False

    # 非阻塞等待
    time.sleep(1)
    print(f"Running: {future.running()}")  # True
    print(f"Done: {future.done()}")        # False

    # 阻塞獲取結果
    result = future.result(timeout=5)  # 等待最多 5 秒
    print(f"結果: {result}")            # 30

    print(f"Done: {future.done()}")     # True

取消 Future

from concurrent.futures import ThreadPoolExecutor
import time

def long_task(n):
    time.sleep(5)
    return n

with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任務
    future1 = executor.submit(long_task, 1)
    future2 = executor.submit(long_task, 2)
    future3 = executor.submit(long_task, 3)  # 在佇列中等待

    time.sleep(0.1)

    # 嘗試取消
    print(f"取消 future1: {future1.cancel()}")  # False(已在執行)
    print(f"取消 future2: {future2.cancel()}")  # False(已在執行)
    print(f"取消 future3: {future3.cancel()}")  # True(還在佇列)

    # 檢查狀態
    print(f"future1 已取消: {future1.cancelled()}")  # False
    print(f"future3 已取消: {future3.cancelled()}")  # True

添加回調函式

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * 2

def on_complete(future):
    """任務完成時的回調"""
    result = future.result()
    print(f"✓ 任務完成,結果: {result}")

with ThreadPoolExecutor() as executor:
    # 提交任務並添加回調
    for i in range(5):
        future = executor.submit(task, i)
        future.add_done_callback(on_complete)

# 輸出:
# ✓ 任務完成,結果: 0
# ✓ 任務完成,結果: 2
# ...(順序可能不同)

4️⃣ as_completed() 和 wait()

as_completed():按完成順序

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def task(n):
    """隨機耗時任務"""
    sleep_time = random.uniform(1, 3)
    time.sleep(sleep_time)
    return n, sleep_time

with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任務
    futures = [executor.submit(task, i) for i in range(10)]

    # 按完成順序處理
    for future in as_completed(futures):
        n, sleep_time = future.result()
        print(f"任務 {n} 完成(耗時 {sleep_time:.2f}s)")

# 輸出:按完成順序,不是提交順序
# 任務 3 完成(耗時 1.2s)
# 任務 7 完成(耗時 1.5s)
# 任務 1 完成(耗時 2.1s)
# ...

wait():等待多個 Future

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in [3, 1, 2]]

    # 等待全部完成
    done, not_done = wait(futures, return_when=ALL_COMPLETED)
    print(f"完成: {len(done)}, 未完成: {len(not_done)}")

    # 等待第一個完成
    futures = [executor.submit(task, i) for i in [3, 1, 2]]
    done, not_done = wait(futures, return_when=FIRST_COMPLETED, timeout=5)
    print(f"完成: {len(done)}, 未完成: {len(not_done)}")

5️⃣ 實戰案例

案例 1:批量下載檔案

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def download_file(url):
    """下載單個檔案"""
    try:
        response = requests.get(url, timeout=10)
        filename = url.split('/')[-1] or 'index.html'
        with open(f'downloads/{filename}', 'wb') as f:
            f.write(response.content)
        return f"✓ {filename} ({len(response.content)} bytes)"
    except Exception as e:
        return f"✗ {url}: {e}"

def batch_download(urls, max_workers=5):
    """批量下載"""
    start = time.time()
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任務
        future_to_url = {executor.submit(download_file, url): url for url in urls}

        # 按完成順序處理
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                results.append((url, result))
                print(result)
            except Exception as e:
                print(f"✗ 錯誤: {url}, {e}")

    elapsed = time.time() - start
    print(f"\n完成 {len(results)}/{len(urls)} 個下載,耗時 {elapsed:.2f}s")
    return results

# 使用
urls = [
    'https://www.python.org',
    'https://www.github.com',
    'https://www.stackoverflow.com',
]
batch_download(urls, max_workers=3)

案例 2:並行處理資料

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np

def process_chunk(data_chunk):
    """處理資料塊"""
    # CPU 密集運算
    result = data_chunk.apply(lambda x: x ** 2 + np.sin(x))
    return result

def parallel_process(data, num_workers=4):
    """並行處理大型資料集"""
    # 切分資料
    chunk_size = len(data) // num_workers
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

    # 並行處理
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(process_chunk, chunks))

    # 合併結果
    return pd.concat(results)

if __name__ == '__main__':
    # 創建大型資料集
    data = pd.Series(np.random.rand(10000000))

    # 並行處理
    import time
    start = time.time()
    result = parallel_process(data, num_workers=4)
    print(f"耗時: {time.time() - start:.2f}s")

案例 3:混合 Thread 和 Process

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import time

def fetch_url(url):
    """I/O 密集:下載網頁"""
    response = requests.get(url)
    return response.text

def analyze_text(text):
    """CPU 密集:分析文字"""
    # 模擬複雜分析
    word_count = len(text.split())
    char_count = len(text)
    return {'words': word_count, 'chars': char_count}

def process_urls(urls):
    """混合處理"""
    # 步驟 1:用 Thread 下載(I/O 密集)
    with ThreadPoolExecutor(max_workers=10) as executor:
        texts = list(executor.map(fetch_url, urls))
        print(f"下載完成 {len(texts)} 個網頁")

    # 步驟 2:用 Process 分析(CPU 密集)
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(analyze_text, texts))
        print(f"分析完成 {len(results)} 個網頁")

    return results

if __name__ == '__main__':
    urls = [f'https://httpbin.org/html' for _ in range(20)]
    results = process_urls(urls)
    print(results)

案例 4:進度追蹤

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task(n):
    time.sleep(1)
    return n * 2

def process_with_progress(items, max_workers=5):
    """帶進度追蹤的處理"""
    total = len(items)
    completed = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任務
        futures = {executor.submit(task, item): item for item in items}

        # 追蹤進度
        for future in as_completed(futures):
            item = futures[future]
            try:
                result = future.result()
                completed += 1
                progress = completed / total * 100
                print(f"進度: {progress:.1f}% ({completed}/{total})", end='\r')
            except Exception as e:
                print(f"\n錯誤: {item}, {e}")

    print(f"\n完成!")

# 使用
items = list(range(100))
process_with_progress(items, max_workers=10)

6️⃣ 最佳實踐

1. 選擇合適的 Worker 數量

import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Thread Pool:I/O 密集型
# 可以設置較多 worker(幾十到幾百)
with ThreadPoolExecutor(max_workers=50) as executor:
    pass

# Process Pool:CPU 密集型
# worker 數量 = CPU 核心數
num_workers = os.cpu_count()  # 通常 4, 8, 16
with ProcessPoolExecutor(max_workers=num_workers) as executor:
    pass

2. 妥善處理異常

from concurrent.futures import ThreadPoolExecutor

def safe_task(item):
    try:
        return risky_operation(item)
    except Exception as e:
        return f"Error: {e}"

with ThreadPoolExecutor() as executor:
    results = executor.map(safe_task, items)
    for result in results:
        if isinstance(result, str) and result.startswith("Error"):
            print(f"處理錯誤: {result}")

3. 使用 timeout 避免阻塞

from concurrent.futures import ThreadPoolExecutor, TimeoutError

def long_task():
    time.sleep(100)
    return "Done"

with ThreadPoolExecutor() as executor:
    future = executor.submit(long_task)
    try:
        result = future.result(timeout=5)  # 最多等 5 秒
    except TimeoutError:
        print("任務超時")
        future.cancel()  # 嘗試取消

4. 使用 context manager

# ✅ 正確:自動清理
with ThreadPoolExecutor() as executor:
    results = executor.map(task, items)

# ❌ 錯誤:可能忘記關閉
executor = ThreadPoolExecutor()
results = executor.map(task, items)
executor.shutdown(wait=True)  # 手動關閉

✅ 重點回顧

concurrent.futures 優勢:

  • ✅ 統一的 API(Thread 和 Process)
  • ✅ 自動管理 Pool
  • ✅ 更容易獲取結果
  • ✅ 更好的錯誤處理

兩種 Executor:

  • ThreadPoolExecutor - I/O 密集型
  • ProcessPoolExecutor - CPU 密集型

兩種提交方式:

  • submit() - 逐個提交,靈活
  • map() - 批量提交,保持順序

Future 物件:

  • result() - 獲取結果(阻塞)
  • cancel() - 取消任務
  • add_done_callback() - 添加回調

工具函式:

  • as_completed() - 按完成順序
  • wait() - 等待多個 Future

關鍵:

  • ✅ 比 threading/multiprocessing 更易用
  • ✅ 適合大部分併發場景
  • ✅ 是 Python 併發的首選方案

上一篇: 05-3. multiprocessing 模組完整指南 下一篇: 05-5. asyncio 基礎概念


最後更新:2025-01-06

0%