目錄
03-2. Pipe(管道)詳解
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐ (簡單)
🤔 一句話解釋
Pipe(管道)是一種單向或雙向的通訊通道,主要用於父子 Process 或相關 Process 之間的資料傳輸。
📞 用電話專線來比喻
Process A Process B
├─ 寫入端 ──────────→ 讀取端
│
└─ 就像電話專線:
- 一端說話(寫入)
- 另一端聽(讀取)
- 點對點連接🔧 Pipe 的類型
1. 單向 Pipe (One-way Pipe)
from multiprocessing import Process, Pipe
def sender(conn):
"""發送端"""
messages = ["Hello", "World", "from", "Process A"]
for msg in messages:
conn.send(msg)
print(f"發送: {msg}")
conn.close()
def receiver(conn):
"""接收端"""
while True:
try:
msg = conn.recv()
print(f"收到: {msg}")
except EOFError:
break
conn.close()
# 創建 Pipe(默認是雙向的)
parent_conn, child_conn = Pipe()
# 創建 Process
p1 = Process(target=sender, args=(parent_conn,))
p2 = Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()輸出:
發送: Hello
發送: World
發送: from
發送: Process A
收到: Hello
收到: World
收到: from
收到: Process A2. 雙向 Pipe (Duplex Pipe)
from multiprocessing import Process, Pipe
import time
def worker1(conn):
"""Worker 1:發送並接收"""
# 發送訊息
conn.send("來自 Worker 1 的問候")
print("[Worker 1] 已發送訊息")
# 接收回應
time.sleep(0.5)
response = conn.recv()
print(f"[Worker 1] 收到回應: {response}")
conn.close()
def worker2(conn):
"""Worker 2:接收並回應"""
# 接收訊息
msg = conn.recv()
print(f"[Worker 2] 收到訊息: {msg}")
# 發送回應
conn.send("Worker 2 已收到!")
print("[Worker 2] 已發送回應")
conn.close()
# 創建雙向 Pipe
conn1, conn2 = Pipe(duplex=True) # duplex=True 是默認值
p1 = Process(target=worker1, args=(conn1,))
p2 = Process(target=worker2, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()輸出:
[Worker 1] 已發送訊息
[Worker 2] 收到訊息: 來自 Worker 1 的問候
[Worker 2] 已發送回應
[Worker 1] 收到回應: Worker 2 已收到!🎯 Pipe 的特性
特性 1:點對點通訊
from multiprocessing import Process, Pipe
def endpoint_a(conn):
conn.send("A → B")
print(f"[A] 發送完成")
conn.close()
def endpoint_b(conn):
msg = conn.recv()
print(f"[B] 收到: {msg}")
conn.close()
conn_a, conn_b = Pipe()
# 只能是 A 和 B 兩個端點通訊
Process(target=endpoint_a, args=(conn_a,)).start()
Process(target=endpoint_b, args=(conn_b,)).start()
# ⚠️ 不能有第三個 Process 加入這個 Pipe特性 2:阻塞式 I/O
from multiprocessing import Process, Pipe
import time
def slow_sender(conn):
time.sleep(3) # 延遲 3 秒
conn.send("終於來了!")
conn.close()
def receiver(conn):
print("等待接收資料...")
msg = conn.recv() # ⏸️ 阻塞,直到有資料
print(f"收到: {msg}")
conn.close()
conn1, conn2 = Pipe()
Process(target=slow_sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()輸出:
等待接收資料...
(等待 3 秒...)
收到: 終於來了!特性 3:FIFO(先進先出)
from multiprocessing import Process, Pipe
def sender(conn):
messages = ["第一條", "第二條", "第三條"]
for msg in messages:
conn.send(msg)
conn.close()
def receiver(conn):
for _ in range(3):
msg = conn.recv()
print(f"收到: {msg}")
conn.close()
conn1, conn2 = Pipe()
Process(target=sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()輸出:
收到: 第一條 ← 先發送的先收到
收到: 第二條
收到: 第三條🔄 實際案例 1:父子 Process 通訊
from multiprocessing import Process, Pipe
import os
def child_process(conn):
"""子 Process"""
# 接收父 Process 的任務
task = conn.recv()
print(f"[子 Process {os.getpid()}] 收到任務: {task}")
# 執行任務
result = task.upper() # 轉大寫
# 回傳結果
conn.send(result)
print(f"[子 Process {os.getpid()}] 已回傳結果")
conn.close()
# 主 Process
parent_conn, child_conn = Pipe()
# 創建子 Process
p = Process(target=child_process, args=(child_conn,))
p.start()
# 父 Process 發送任務
task = "hello world"
parent_conn.send(task)
print(f"[父 Process {os.getpid()}] 已發送任務: {task}")
# 父 Process 接收結果
result = parent_conn.recv()
print(f"[父 Process {os.getpid()}] 收到結果: {result}")
p.join()
parent_conn.close()輸出:
[父 Process 12345] 已發送任務: hello world
[子 Process 12346] 收到任務: hello world
[子 Process 12346] 已回傳結果
[父 Process 12345] 收到結果: HELLO WORLD📊 實際案例 2:工作分配系統
from multiprocessing import Process, Pipe
import time
import random
def worker(worker_id, conn):
"""工作者 Process"""
while True:
try:
# 接收任務
task = conn.recv()
if task == "STOP":
print(f"[Worker {worker_id}] 收到停止信號")
break
print(f"[Worker {worker_id}] 開始處理: {task}")
# 模擬處理時間
time.sleep(random.uniform(0.5, 1.5))
# 回傳結果
result = f"Task {task} 完成"
conn.send(result)
print(f"[Worker {worker_id}] 完成: {task}")
except EOFError:
break
conn.close()
# 主 Process(任務分配器)
def main():
# 創建 3 個 Worker
workers = []
for i in range(3):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(i, child_conn))
p.start()
workers.append((p, parent_conn))
# 分配任務
tasks = [f"Task-{i}" for i in range(10)]
worker_idx = 0
for task in tasks:
_, conn = workers[worker_idx]
conn.send(task)
print(f"[主程式] 分配 {task} 給 Worker {worker_idx}")
# 輪流分配
worker_idx = (worker_idx + 1) % len(workers)
# 接收所有結果
for _ in tasks:
for _, conn in workers:
if conn.poll(): # 檢查是否有資料
result = conn.recv()
print(f"[主程式] 收到結果: {result}")
# 停止所有 Worker
for _, conn in workers:
conn.send("STOP")
# 等待所有 Worker 結束
for p, conn in workers:
p.join()
conn.close()
if __name__ == '__main__':
main()🔐 實際案例 3:安全的資料傳輸
from multiprocessing import Process, Pipe
import pickle
import hashlib
def secure_sender(conn):
"""安全發送端"""
data = {
'user': 'alice',
'password': 'secret123',
'balance': 10000
}
# 序列化資料
serialized = pickle.dumps(data)
# 計算校驗和
checksum = hashlib.md5(serialized).hexdigest()
# 發送資料和校驗和
conn.send((serialized, checksum))
print("[發送端] 已發送資料和校驗和")
conn.close()
def secure_receiver(conn):
"""安全接收端"""
# 接收資料
serialized, received_checksum = conn.recv()
# 驗證校驗和
calculated_checksum = hashlib.md5(serialized).hexdigest()
if calculated_checksum == received_checksum:
print("[接收端] ✅ 校驗和驗證通過")
data = pickle.loads(serialized)
print(f"[接收端] 資料: {data}")
else:
print("[接收端] ❌ 校驗和驗證失敗,資料可能損壞")
conn.close()
conn1, conn2 = Pipe()
Process(target=secure_sender, args=(conn1,)).start()
Process(target=secure_receiver, args=(conn2,)).start()輸出:
[發送端] 已發送資料和校驗和
[接收端] ✅ 校驗和驗證通過
[接收端] 資料: {'user': 'alice', 'password': 'secret123', 'balance': 10000}⚠️ Pipe 的限制與注意事項
限制 1:只能兩個端點
from multiprocessing import Process, Pipe
conn1, conn2 = Pipe()
# ✅ 正確:兩個 Process
def p1_func(conn):
conn.send("Hello")
conn.close()
def p2_func(conn):
msg = conn.recv()
print(msg)
conn.close()
Process(target=p1_func, args=(conn1,)).start()
Process(target=p2_func, args=(conn2,)).start()
# ❌ 錯誤:不能有第三個 Process
# 如果需要多對多通訊,使用 Queue限制 2:資料大小限制
from multiprocessing import Pipe
import sys
conn1, conn2 = Pipe()
# ⚠️ 發送過大的資料可能會有問題
huge_data = "x" * (100 * 1024 * 1024) # 100 MB
try:
conn1.send(huge_data)
print(f"資料大小: {sys.getsizeof(huge_data) / 1024 / 1024:.2f} MB")
except Exception as e:
print(f"錯誤: {e}")
# 💡 建議:大資料使用 Shared Memory注意事項 1:記得關閉連接
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello")
conn.close() # ✅ 記得關閉
def receiver(conn):
msg = conn.recv()
print(msg)
conn.close() # ✅ 記得關閉
conn1, conn2 = Pipe()
p1 = Process(target=sender, args=(conn1,))
p2 = Process(target=receiver, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
# ⚠️ 如果不關閉,可能導致資源洩漏注意事項 2:避免死鎖
from multiprocessing import Process, Pipe
def deadlock_example(conn1, conn2):
# ❌ 危險:兩個 Process 都先 recv()
msg = conn1.recv() # 阻塞,等待 conn2 發送
conn2.send("Response") # 永遠不會執行到這裡
# 💀 兩個 Process 都在等待對方,造成死鎖解決方案:
def safe_example(conn1, conn2):
# ✅ 正確:一個先 send,一個先 recv
conn2.send("Request")
msg = conn1.recv()🔍 poll() 檢查是否有資料
from multiprocessing import Process, Pipe
import time
def sender(conn):
time.sleep(2) # 延遲 2 秒
conn.send("Hello")
conn.close()
def receiver(conn):
print("等待資料...")
# 使用 poll() 檢查
while not conn.poll(): # 非阻塞檢查
print("還沒有資料,繼續等待...")
time.sleep(0.5)
# 有資料了,接收
msg = conn.recv()
print(f"收到: {msg}")
conn.close()
conn1, conn2 = Pipe()
Process(target=sender, args=(conn1,)).start()
Process(target=receiver, args=(conn2,)).start()輸出:
等待資料...
還沒有資料,繼續等待...
還沒有資料,繼續等待...
還沒有資料,繼續等待...
收到: Hello📈 Pipe vs Queue
| 特性 | Pipe | Queue |
|---|---|---|
| 端點數量 | 2 個 | 多個 |
| 速度 | 更快 | 較慢 |
| 使用場景 | 父子 Process | 生產者/消費者 |
| 複雜度 | 低 | 中 |
| 線程安全 | 否 | 是 |
# Pipe:適合簡單的點對點通訊
conn1, conn2 = Pipe()
# Queue:適合多對多通訊
from multiprocessing import Queue
queue = Queue()💡 最佳實踐
1. 使用 with 語句(如果可能)
# 雖然 Pipe 沒有內建 context manager,但可以確保關閉
def safe_communication():
conn1, conn2 = Pipe()
try:
conn1.send("Hello")
msg = conn2.recv()
return msg
finally:
conn1.close()
conn2.close()2. 設置超時
from multiprocessing import Pipe
import time
conn1, conn2 = Pipe()
# 使用 poll() 設置超時
if conn2.poll(timeout=5): # 最多等 5 秒
msg = conn2.recv()
print(f"收到: {msg}")
else:
print("超時,沒有收到資料")3. 適當的錯誤處理
from multiprocessing import Process, Pipe
def robust_receiver(conn):
try:
while True:
if conn.poll():
msg = conn.recv()
print(f"收到: {msg}")
else:
break
except EOFError:
print("連接已關閉")
except Exception as e:
print(f"錯誤: {e}")
finally:
conn.close()✅ 重點回顧
Pipe 的特性:
- 點對點通訊(只能兩個端點)
- 可以是單向或雙向
- FIFO(先進先出)
- 阻塞式 I/O
- 速度快,適合簡單場景
Pipe 的方法:
send(obj)- 發送對象recv()- 接收對象(阻塞)poll(timeout)- 檢查是否有資料(非阻塞)close()- 關閉連接
適用場景:
- ✅ 父子 Process 通訊
- ✅ 簡單的點對點資料交換
- ✅ 需要雙向通訊
- ❌ 多對多通訊(使用 Queue)
- ❌ 大量資料傳輸(使用 Shared Memory)
注意事項:
- 記得關閉連接
- 避免死鎖(注意 send/recv 順序)
- 不適合大資料傳輸
- 只能兩個端點
上一篇: 03-1. IPC 概述 下一篇: 03-3. Message Queue 詳解
最後更新:2025-01-06