目錄
05-3. multiprocessing 模組完整指南
⏱️ 閱讀時間: 20 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
完整掌握 Python multiprocessing 模組,包含 Process 創建、Pool 管理、資料共享、進程間通訊(IPC)等所有核心功能。
📚 multiprocessing 概覽
multiprocessing 是 Python 標準庫中用於創建和管理多個 Process 的模組,可以繞過 GIL 限制,實現真正的並行計算。
from multiprocessing import Process, Pool, Queue, Pipe, Manager
from multiprocessing import Value, Array, Lock1️⃣ 基礎:創建 Process
方法 1:函式作為 target
from multiprocessing import Process
import os
import time
def worker(name, sleep_time):
"""Worker 函式"""
print(f"[{name}] PID: {os.getpid()}, 父 PID: {os.getppid()}")
print(f"[{name}] 開始工作...")
time.sleep(sleep_time)
print(f"[{name}] 完成工作")
if __name__ == '__main__':
# 創建 Process
p1 = Process(target=worker, args=('Worker-1', 2))
p2 = Process(target=worker, args=('Worker-2', 3))
# 啟動 Process
p1.start()
p2.start()
# 等待完成
p1.join()
p2.join()
print("所有 Process 完成")輸出:
[Worker-1] PID: 1001, 父 PID: 1000
[Worker-1] 開始工作...
[Worker-2] PID: 1002, 父 PID: 1000
[Worker-2] 開始工作...
[Worker-1] 完成工作
[Worker-2] 完成工作
所有 Process 完成方法 2:繼承 Process 類別
from multiprocessing import Process
import os
class WorkerProcess(Process):
def __init__(self, name, task_id):
super().__init__()
self.task_name = name
self.task_id = task_id
def run(self):
"""Process 執行的主要邏輯"""
print(f"[{self.task_name}] PID: {os.getpid()}")
print(f"[{self.task_name}] 處理任務 {self.task_id}")
# 執行任務
result = self.process_task()
print(f"[{self.task_name}] 結果: {result}")
def process_task(self):
"""任務處理邏輯"""
return sum(range(self.task_id * 1000000))
if __name__ == '__main__':
# 創建自定義 Process
workers = [WorkerProcess(f'Worker-{i}', i) for i in range(3)]
# 啟動
for w in workers:
w.start()
# 等待
for w in workers:
w.join()Process 屬性和方法
from multiprocessing import Process
import time
def worker():
time.sleep(2)
if __name__ == '__main__':
p = Process(target=worker, name='MyWorker')
# Process 屬性
print(f"Name: {p.name}") # MyWorker
print(f"Daemon: {p.daemon}") # False
print(f"PID: {p.pid}") # None(尚未啟動)
print(f"Alive: {p.is_alive()}") # False
# 啟動 Process
p.start()
print(f"PID: {p.pid}") # 1001(已啟動)
print(f"Alive: {p.is_alive()}") # True
# 等待完成
p.join(timeout=3) # 最多等待 3 秒
print(f"Exit code: {p.exitcode}") # 0(正常結束)
print(f"Alive: {p.is_alive()}") # False2️⃣ 進階:Process Pool
Pool 基礎用法
from multiprocessing import Pool
import time
def square(x):
"""計算平方"""
print(f"計算 {x}^2,PID: {os.getpid()}")
time.sleep(0.5)
return x * x
if __name__ == '__main__':
# 創建 Pool(4 個 Worker Process)
with Pool(processes=4) as pool:
# 方法 1:map(阻塞)
results = pool.map(square, range(10))
print(f"結果: {results}")
# 輸出:結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]Pool 的四種執行方法
1. map():阻塞,有序
from multiprocessing import Pool
def process_data(x):
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
results = pool.map(process_data, [1, 2, 3, 4, 5])
print(results)
# 輸出:[2, 4, 6, 8, 10](順序與輸入一致)2. map_async():非阻塞,有序
from multiprocessing import Pool
import time
def process_data(x):
time.sleep(1)
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
# 非阻塞提交
result_async = pool.map_async(process_data, [1, 2, 3, 4, 5])
print("任務已提交,繼續其他工作...")
time.sleep(0.5)
# 等待結果
results = result_async.get(timeout=5)
print(results)
# 輸出:[2, 4, 6, 8, 10]3. apply():阻塞,單一任務
from multiprocessing import Pool
def add(x, y):
return x + y
if __name__ == '__main__':
with Pool(4) as pool:
# 單一任務(阻塞)
result = pool.apply(add, (3, 5))
print(result) # 輸出:84. apply_async():非阻塞,單一任務
from multiprocessing import Pool
import time
def slow_add(x, y):
time.sleep(2)
return x + y
if __name__ == '__main__':
with Pool(4) as pool:
# 非阻塞提交多個任務
results = []
for i in range(5):
result = pool.apply_async(slow_add, (i, i))
results.append(result)
print("所有任務已提交")
# 獲取結果
for i, r in enumerate(results):
print(f"任務 {i} 結果: {r.get()}")Pool 進階功能
進度追蹤
from multiprocessing import Pool
import time
def process_item(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
items = range(100)
total = len(items)
with Pool(4) as pool:
# 使用 imap 獲取即時進度
results = []
for i, result in enumerate(pool.imap(process_item, items), 1):
results.append(result)
print(f"進度: {i}/{total} ({i/total*100:.1f}%)", end='\r')
print(f"\n完成!總共 {len(results)} 個結果")錯誤處理
from multiprocessing import Pool
def risky_task(x):
if x == 5:
raise ValueError(f"錯誤:x = {x}")
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
results = []
for i in range(10):
result = pool.apply_async(risky_task, (i,))
results.append((i, result))
# 獲取結果並處理錯誤
for i, r in results:
try:
value = r.get(timeout=1)
print(f"任務 {i}: {value}")
except ValueError as e:
print(f"任務 {i} 發生錯誤: {e}")3️⃣ 資料共享:一個程式,多個 Process
問題:Process 無法直接共享變數
from multiprocessing import Process
counter = 0 # 全域變數
def increment():
global counter
for _ in range(100000):
counter += 1
print(f"Process counter: {counter}")
if __name__ == '__main__':
p1 = Process(target=increment)
p2 = Process(target=increment)
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Main counter: {counter}")
# 輸出:
# Process counter: 100000 ← Process 1 的副本
# Process counter: 100000 ← Process 2 的副本
# Main counter: 0 ← 主 Process 未變解決方案 1:Value 和 Array
from multiprocessing import Process, Value, Array, Lock
import time
def increment_shared(counter, lock):
"""使用共享記憶體"""
for _ in range(100000):
with lock: # 需要 Lock
counter.value += 1
def update_array(shared_array, index, value, lock):
"""修改共享陣列"""
with lock:
shared_array[index] = value
if __name__ == '__main__':
# 共享整數
counter = Value('i', 0) # 'i' = int
lock = Lock()
# 創建 Process
p1 = Process(target=increment_shared, args=(counter, lock))
p2 = Process(target=increment_shared, args=(counter, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Counter: {counter.value}") # 輸出:200000
# 共享陣列
arr = Array('d', [0.0, 0.0, 0.0]) # 'd' = double
processes = []
for i in range(3):
p = Process(target=update_array, args=(arr, i, i * 10.5, lock))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Array: {list(arr)}") # 輸出:[0.0, 10.5, 21.0]Value 和 Array 類型代碼:
'c' : ctypes.c_char # 字元
'i' : ctypes.c_int # 整數
'f' : ctypes.c_float # 浮點數
'd' : ctypes.c_double # 雙精度浮點數
'b' : ctypes.c_byte # 位元組解決方案 2:Manager
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list, worker_id):
"""使用 Manager 共享複雜資料結構"""
# 修改共享字典
shared_dict[worker_id] = f"Worker-{worker_id} 結果"
# 修改共享列表
shared_list.append(worker_id * 10)
if __name__ == '__main__':
# 創建 Manager
with Manager() as manager:
# 共享字典和列表
shared_dict = manager.dict()
shared_list = manager.list()
# 創建多個 Process
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_dict, shared_list, i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Dict: {dict(shared_dict)}")
print(f"List: {list(shared_list)}")
# 輸出:
# Dict: {0: 'Worker-0 結果', 1: 'Worker-1 結果', ...}
# List: [0, 10, 20, 30, 40]Manager 支援的類型:
manager.list() # 列表
manager.dict() # 字典
manager.Queue() # 佇列
manager.Lock() # 鎖
manager.Value() # 值
manager.Array() # 陣列
manager.Namespace() # 命名空間解決方案 3:Namespace(最靈活)
from multiprocessing import Process, Manager
def worker(namespace, worker_id):
"""使用 Namespace 共享屬性"""
namespace.counter += 1
namespace.results.append(worker_id)
namespace.status = f"Worker {worker_id} 完成"
if __name__ == '__main__':
with Manager() as manager:
# 創建 Namespace
ns = manager.Namespace()
ns.counter = 0
ns.results = manager.list()
ns.status = "初始化"
# 創建 Process
processes = [Process(target=worker, args=(ns, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {ns.counter}")
print(f"Results: {list(ns.results)}")
print(f"Status: {ns.status}")4️⃣ 進程間通訊(IPC)
方法 1:Queue(佇列)
from multiprocessing import Process, Queue
import time
def producer(queue, n):
"""生產者"""
for i in range(n):
item = f"Item-{i}"
queue.put(item)
print(f"生產: {item}")
time.sleep(0.5)
queue.put(None) # 結束信號
def consumer(queue):
"""消費者"""
while True:
item = queue.get()
if item is None:
break
print(f"消費: {item}")
time.sleep(1)
if __name__ == '__main__':
# 創建 Queue
q = Queue()
# 創建生產者和消費者
p1 = Process(target=producer, args=(q, 5))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()方法 2:Pipe(管道)
from multiprocessing import Process, Pipe
def sender(conn):
"""發送端"""
conn.send("Hello")
conn.send({"key": "value"})
conn.send([1, 2, 3])
conn.close()
def receiver(conn):
"""接收端"""
msg1 = conn.recv()
msg2 = conn.recv()
msg3 = conn.recv()
print(f"收到: {msg1}, {msg2}, {msg3}")
conn.close()
if __name__ == '__main__':
# 創建 Pipe
parent_conn, child_conn = Pipe()
# 創建 Process
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()Queue vs Pipe:
# Queue:多對多,線程安全
# Pipe:點對點,更快但不支援多個寫入者5️⃣ 同步機制
Lock(鎖)
from multiprocessing import Process, Lock, Value
import time
def increment_with_lock(counter, lock):
"""使用 Lock 保護共享資源"""
for _ in range(100000):
lock.acquire()
try:
counter.value += 1
finally:
lock.release()
def increment_no_lock(counter):
"""沒有 Lock(錯誤示範)"""
for _ in range(100000):
counter.value += 1 # Race Condition
if __name__ == '__main__':
# 測試有 Lock
counter = Value('i', 0)
lock = Lock()
p1 = Process(target=increment_with_lock, args=(counter, lock))
p2 = Process(target=increment_with_lock, args=(counter, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"有 Lock: {counter.value}") # 200000
# 測試沒有 Lock
counter = Value('i', 0)
p1 = Process(target=increment_no_lock, args=(counter,))
p2 = Process(target=increment_no_lock, args=(counter,))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"沒有 Lock: {counter.value}") # 可能 150000(錯誤)Semaphore(信號量)
from multiprocessing import Process, Semaphore
import time
def worker(semaphore, worker_id):
"""限制同時執行的 Process 數量"""
print(f"Worker {worker_id} 等待...")
semaphore.acquire()
try:
print(f"Worker {worker_id} 開始工作")
time.sleep(2)
print(f"Worker {worker_id} 完成")
finally:
semaphore.release()
if __name__ == '__main__':
# 最多 2 個 Process 同時執行
semaphore = Semaphore(2)
# 創建 5 個 Process
processes = [Process(target=worker, args=(semaphore, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
# 輸出:
# Worker 0 等待...
# Worker 0 開始工作
# Worker 1 等待...
# Worker 1 開始工作
# Worker 2 等待... ← 等待中
# ...6️⃣ 實戰案例
案例 1:批量影像處理
from multiprocessing import Pool
from PIL import Image
import os
def process_image(image_path):
"""處理單張圖片"""
try:
# 開啟圖片
img = Image.open(image_path)
# 調整大小
img = img.resize((800, 600))
# 套用濾鏡
# img = img.filter(ImageFilter.BLUR)
# 儲存
output_path = f"output/{os.path.basename(image_path)}"
img.save(output_path, quality=85)
return f"完成: {image_path}"
except Exception as e:
return f"錯誤: {image_path}, {e}"
if __name__ == '__main__':
# 獲取所有圖片
image_paths = [f"images/img_{i}.jpg" for i in range(100)]
# 使用 Process Pool 批量處理
with Pool(processes=8) as pool:
results = pool.map(process_image, image_paths)
# 統計結果
success = sum(1 for r in results if r.startswith("完成"))
print(f"成功: {success}/{len(results)}")案例 2:Web 爬蟲(混合 Thread 和 Process)
from multiprocessing import Process, Queue
from threading import Thread
import requests
def crawler_process(url_queue, result_queue, process_id):
"""每個 Process 內用多個 Thread 爬取"""
def crawl(url):
try:
response = requests.get(url, timeout=5)
return (url, response.status_code, len(response.content))
except Exception as e:
return (url, 0, str(e))
threads = []
while not url_queue.empty():
try:
url = url_queue.get(timeout=1)
t = Thread(target=lambda u: result_queue.put(crawl(u)), args=(url,))
t.start()
threads.append(t)
except:
break
for t in threads:
t.join()
print(f"Process {process_id} 完成")
if __name__ == '__main__':
# 準備 URL
urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
url_queue = Queue()
result_queue = Queue()
for url in urls:
url_queue.put(url)
# 創建 4 個 Process
processes = []
for i in range(4):
p = Process(target=crawler_process, args=(url_queue, result_queue, i))
p.start()
processes.append(p)
for p in processes:
p.join()
# 收集結果
results = []
while not result_queue.empty():
results.append(result_queue.get())
print(f"總共爬取: {len(results)} 個 URL")7️⃣ 最佳實踐
1. 永遠使用 if __name__ == '__main__'
# ✅ 正確
if __name__ == '__main__':
p = Process(target=worker)
p.start()
# ❌ 錯誤(Windows 上會無限創建 Process)
p = Process(target=worker)
p.start()2. 使用 Process Pool 管理 Process
# ❌ 錯誤:手動創建太多 Process
processes = [Process(target=work) for _ in range(1000)]
# ✅ 正確:使用 Pool
with Pool(8) as pool:
pool.map(work, range(1000))3. 選擇合適的共享方式
# 簡單類型 → Value/Array(快)
counter = Value('i', 0)
# 複雜類型 → Manager(慢但靈活)
shared_dict = Manager().dict()
# 大量資料 → 考慮不共享,用 Queue 傳遞結果4. 正確處理異常
from multiprocessing import Pool
def safe_worker(x):
try:
return risky_operation(x)
except Exception as e:
return f"Error: {e}"
if __name__ == '__main__':
with Pool(4) as pool:
results = pool.map(safe_worker, data)✅ 重點回顧
Process 創建:
Process(target=func)- 基本用法- 繼承
Process類別 - 面向物件 Pool- 管理 Process 池
資料共享:
Value,Array- 簡單類型Manager- 複雜類型Namespace- 靈活共享
進程間通訊:
Queue- 多對多,線程安全Pipe- 點對點,更快
同步機制:
Lock- 互斥鎖Semaphore- 信號量
關鍵:
- ✅ 用於 CPU 密集型任務
- ✅ 繞過 GIL,真正並行
- ✅ 記得
if __name__ == '__main__' - ✅ 優先使用 Pool 管理
上一篇: 05-2. threading 模組完整指南 下一篇: 05-4. concurrent.futures 使用指南
最後更新:2025-01-06