目錄
03-3. Message Queue(消息佇列)詳解
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🤔 一句話解釋
Message Queue(消息佇列)是一種多對多的 IPC 機制,允許多個生產者和消費者通過 FIFO 佇列交換訊息。
📨 用郵件系統來比喻
生產者(發件人) 消息佇列(郵局) 消費者(收件人)
│ ┌──────┐ │
├──── 訊息 1 ──────→ │訊息 1 │ ────→ 消費者 A
│ ├──────┤ │
├──── 訊息 2 ──────→ │訊息 2 │ ────→ 消費者 B
│ ├──────┤ │
└──── 訊息 3 ──────→ │訊息 3 │ ────→ 消費者 C
└──────┘
特點:
- 訊息排隊等待處理
- 先進先出(FIFO)
- 支援多個發送者和接收者
- 非同步通訊🎯 Queue 的基本用法
1. 簡單的生產者-消費者
from multiprocessing import Process, Queue
import time
def producer(queue):
"""生產者:產生資料"""
for i in range(5):
item = f"Item-{i}"
queue.put(item)
print(f"[生產者] 生產: {item}")
time.sleep(0.5)
print("[生產者] 完成生產")
def consumer(queue):
"""消費者:消費資料"""
while True:
try:
item = queue.get(timeout=2) # 最多等 2 秒
print(f"[消費者] 消費: {item}")
time.sleep(1)
except:
print("[消費者] 佇列為空,停止消費")
break
# 創建 Queue
queue = Queue()
# 創建 Process
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()輸出:
[生產者] 生產: Item-0
[消費者] 消費: Item-0
[生產者] 生產: Item-1
[生產者] 生產: Item-2
[消費者] 消費: Item-1
[生產者] 生產: Item-3
[生產者] 生產: Item-4
[生產者] 完成生產
[消費者] 消費: Item-2
[消費者] 消費: Item-3
[消費者] 消費: Item-4
[消費者] 佇列為空,停止消費2. 多個生產者,多個消費者
from multiprocessing import Process, Queue
import time
import random
def producer(queue, producer_id):
"""生產者"""
for i in range(3):
item = f"P{producer_id}-Item{i}"
queue.put(item)
print(f"[生產者 {producer_id}] 生產: {item}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(queue, consumer_id):
"""消費者"""
while True:
try:
item = queue.get(timeout=2)
print(f"[消費者 {consumer_id}] 消費: {item}")
time.sleep(random.uniform(0.5, 1))
except:
print(f"[消費者 {consumer_id}] 停止")
break
# 創建 Queue
queue = Queue()
# 創建 3 個生產者
producers = [
Process(target=producer, args=(queue, i))
for i in range(3)
]
# 創建 2 個消費者
consumers = [
Process(target=consumer, args=(queue, i))
for i in range(2)
]
# 啟動所有 Process
for p in producers + consumers:
p.start()
# 等待所有 Process 完成
for p in producers + consumers:
p.join()輸出:
[生產者 0] 生產: P0-Item0
[生產者 1] 生產: P1-Item0
[生產者 2] 生產: P2-Item0
[消費者 0] 消費: P0-Item0
[消費者 1] 消費: P1-Item0
[生產者 0] 生產: P0-Item1
[生產者 1] 生產: P1-Item1
[消費者 0] 消費: P2-Item0
[生產者 2] 生產: P2-Item1
...📊 Queue 的方法
1. put() - 放入資料
from multiprocessing import Queue
queue = Queue()
# 放入資料
queue.put("Hello")
queue.put(123)
queue.put({'key': 'value'})
queue.put([1, 2, 3])
print(f"佇列大小: {queue.qsize()}")
# 輸出:佇列大小: 42. get() - 取出資料
from multiprocessing import Queue
queue = Queue()
queue.put("First")
queue.put("Second")
# 取出資料(FIFO)
item1 = queue.get() # "First"
item2 = queue.get() # "Second"
print(item1, item2)
# 輸出:First Second3. put_nowait() 和 get_nowait() - 非阻塞
from multiprocessing import Queue
from queue import Empty, Full
queue = Queue(maxsize=2) # 限制大小為 2
# 非阻塞 put
try:
queue.put_nowait("Item 1") # ✅ 成功
queue.put_nowait("Item 2") # ✅ 成功
queue.put_nowait("Item 3") # ❌ 會拋出 Full 異常
except Full:
print("佇列已滿")
# 非阻塞 get
try:
item1 = queue.get_nowait() # ✅ 成功
item2 = queue.get_nowait() # ✅ 成功
item3 = queue.get_nowait() # ❌ 會拋出 Empty 異常
except Empty:
print("佇列為空")4. empty() 和 full() - 檢查狀態
from multiprocessing import Queue
queue = Queue(maxsize=3)
print(f"是否為空: {queue.empty()}") # True
queue.put("Item 1")
queue.put("Item 2")
queue.put("Item 3")
print(f"是否已滿: {queue.full()}") # True
print(f"佇列大小: {queue.qsize()}") # 3🏭 實際案例 1:任務處理系統
from multiprocessing import Process, Queue
import time
import random
def task_generator(task_queue, num_tasks):
"""任務產生器"""
for i in range(num_tasks):
task = {
'id': i,
'type': random.choice(['CPU', 'I/O', 'Network']),
'duration': random.uniform(0.5, 2.0)
}
task_queue.put(task)
print(f"[產生器] 創建任務 {i}: {task['type']}")
time.sleep(0.2)
# 發送停止信號
for _ in range(3): # 3 個 Worker
task_queue.put(None)
def worker(task_queue, worker_id, result_queue):
"""工作者"""
processed = 0
while True:
task = task_queue.get()
if task is None: # 停止信號
print(f"[Worker {worker_id}] 收到停止信號,處理了 {processed} 個任務")
break
print(f"[Worker {worker_id}] 處理任務 {task['id']} ({task['type']})")
time.sleep(task['duration'])
# 記錄結果
result = {
'task_id': task['id'],
'worker_id': worker_id,
'status': 'completed'
}
result_queue.put(result)
processed += 1
def result_collector(result_queue, num_tasks):
"""結果收集器"""
results = []
for _ in range(num_tasks):
result = result_queue.get()
results.append(result)
print(f"[收集器] 任務 {result['task_id']} 完成 (Worker {result['worker_id']})")
print(f"\n[收集器] 總共完成 {len(results)} 個任務")
# 主程式
if __name__ == '__main__':
task_queue = Queue()
result_queue = Queue()
num_tasks = 10
# 創建 Process
generator = Process(target=task_generator, args=(task_queue, num_tasks))
workers = [
Process(target=worker, args=(task_queue, i, result_queue))
for i in range(3)
]
collector = Process(target=result_collector, args=(result_queue, num_tasks))
# 啟動
generator.start()
collector.start()
for w in workers:
w.start()
# 等待完成
generator.join()
for w in workers:
w.join()
collector.join()🌐 實際案例 2:網頁爬蟲系統
from multiprocessing import Process, Queue
import time
import random
def url_generator(url_queue):
"""URL 產生器"""
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
'https://example.com/page4',
'https://example.com/page5',
]
for url in urls:
url_queue.put(url)
print(f"[產生器] 添加 URL: {url}")
# 停止信號
for _ in range(2): # 2 個爬蟲
url_queue.put(None)
def crawler(url_queue, crawler_id, data_queue):
"""爬蟲 Worker"""
while True:
url = url_queue.get()
if url is None:
print(f"[爬蟲 {crawler_id}] 完成")
break
print(f"[爬蟲 {crawler_id}] 爬取: {url}")
# 模擬爬取
time.sleep(random.uniform(1, 2))
# 模擬提取資料
data = {
'url': url,
'title': f'Page Title from {url}',
'content': f'Content from {url}',
'links': random.randint(5, 20)
}
data_queue.put(data)
print(f"[爬蟲 {crawler_id}] 完成: {url}")
def data_processor(data_queue, num_pages):
"""資料處理器"""
all_data = []
for _ in range(num_pages):
data = data_queue.get()
all_data.append(data)
print(f"[處理器] 處理資料: {data['title']} ({data['links']} 個連結)")
print(f"\n[處理器] 總共處理 {len(all_data)} 個頁面")
# 主程式
if __name__ == '__main__':
url_queue = Queue()
data_queue = Queue()
num_pages = 5
# 創建 Process
generator = Process(target=url_generator, args=(url_queue,))
crawlers = [
Process(target=crawler, args=(url_queue, i, data_queue))
for i in range(2)
]
processor = Process(target=data_processor, args=(data_queue, num_pages))
# 啟動
generator.start()
processor.start()
for c in crawlers:
c.start()
# 等待
generator.join()
for c in crawlers:
c.join()
processor.join()📸 實際案例 3:圖片處理管線
from multiprocessing import Process, Queue
import time
import random
def image_loader(input_queue):
"""圖片載入器"""
images = [f'image_{i}.jpg' for i in range(10)]
for img in images:
input_queue.put(img)
print(f"[載入器] 載入: {img}")
time.sleep(0.2)
# 停止信號
for _ in range(2): # 2 個處理器
input_queue.put(None)
def image_processor(input_queue, output_queue, processor_id):
"""圖片處理器"""
while True:
img = input_queue.get()
if img is None:
print(f"[處理器 {processor_id}] 完成")
output_queue.put(None) # 傳遞停止信號
break
print(f"[處理器 {processor_id}] 處理: {img}")
# 模擬處理(調整大小、濾鏡等)
time.sleep(random.uniform(0.5, 1.5))
processed_img = {
'original': img,
'processed': f'processed_{img}',
'size': (800, 600)
}
output_queue.put(processed_img)
def image_saver(output_queue, num_images):
"""圖片儲存器"""
saved_count = 0
stop_signals = 0
while True:
item = output_queue.get()
if item is None:
stop_signals += 1
if stop_signals == 2: # 兩個處理器都停止了
break
continue
print(f"[儲存器] 儲存: {item['processed']}")
saved_count += 1
print(f"\n[儲存器] 總共儲存 {saved_count} 張圖片")
# 主程式
if __name__ == '__main__':
input_queue = Queue()
output_queue = Queue()
num_images = 10
# 創建 Process
loader = Process(target=image_loader, args=(input_queue,))
processors = [
Process(target=image_processor, args=(input_queue, output_queue, i))
for i in range(2)
]
saver = Process(target=image_saver, args=(output_queue, num_images))
# 啟動
loader.start()
saver.start()
for p in processors:
p.start()
# 等待
loader.join()
for p in processors:
p.join()
saver.join()⚖️ Queue 的大小限制
from multiprocessing import Queue, Process
from queue import Full
import time
def producer(queue):
"""生產者:快速產生資料"""
for i in range(10):
try:
queue.put(f"Item-{i}", timeout=1) # 最多等 1 秒
print(f"[生產者] 放入: Item-{i}")
except Full:
print(f"[生產者] 佇列已滿,無法放入 Item-{i}")
time.sleep(0.1)
def slow_consumer(queue):
"""消費者:緩慢消費"""
time.sleep(2) # 延遲開始
while True:
try:
item = queue.get(timeout=1)
print(f"[消費者] 取出: {item}")
time.sleep(0.5) # 慢速處理
except:
break
# 限制 Queue 大小為 3
queue = Queue(maxsize=3)
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=slow_consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()輸出:
[生產者] 放入: Item-0
[生產者] 放入: Item-1
[生產者] 放入: Item-2
[生產者] 佇列已滿,無法放入 Item-3 ← 佇列滿了
[生產者] 佇列已滿,無法放入 Item-4
(等待 2 秒...)
[消費者] 取出: Item-0
[生產者] 放入: Item-5 ← 有空間了
...🚨 Queue 的注意事項
注意事項 1:JoinableQueue 與任務追蹤
from multiprocessing import Process, JoinableQueue
import time
def worker(queue):
"""Worker"""
while True:
item = queue.get()
if item is None:
queue.task_done() # 標記完成
break
print(f"[Worker] 處理: {item}")
time.sleep(1)
queue.task_done() # ✅ 標記任務完成
# 使用 JoinableQueue
queue = JoinableQueue()
# 啟動 Worker
w = Process(target=worker, args=(queue,))
w.start()
# 添加任務
for i in range(5):
queue.put(f"Task-{i}")
# 等待所有任務完成
queue.join() # ⏸️ 阻塞,直到所有任務都被 task_done()
print("所有任務完成")
# 停止 Worker
queue.put(None)
w.join()注意事項 2:避免死鎖
from multiprocessing import Process, Queue
def problematic_producer(queue):
# ❌ 危險:無限產生資料
for i in range(10000):
queue.put(f"Item-{i}") # 可能阻塞
# 💀 如果消費者沒有及時取出資料,生產者可能死鎖
# ✅ 解決方案:設置超時或使用大容量 Queue
queue = Queue(maxsize=10000)注意事項 3:序列化問題
from multiprocessing import Queue
import threading
# ❌ 不能放入不可序列化的對象
queue = Queue()
# 例如:Thread 對象不能序列化
t = threading.Thread(target=lambda: None)
# queue.put(t) # 會失敗
# ✅ 只放入可序列化的對象
queue.put("String") # ✅
queue.put(123) # ✅
queue.put([1, 2, 3]) # ✅
queue.put({'key': 'value'}) # ✅🔄 Queue vs Pipe
| 特性 | Queue | Pipe |
|---|---|---|
| 通訊模式 | 多對多 | 點對點 |
| 線程安全 | 是 | 否 |
| 速度 | 較慢 | 更快 |
| FIFO | 是 | 是 |
| 大小限制 | 可設置 | 無 |
| 使用場景 | 任務分配 | 簡單通訊 |
# Queue:適合任務分配、生產者-消費者
from multiprocessing import Queue
queue = Queue()
# Pipe:適合點對點快速通訊
from multiprocessing import Pipe
conn1, conn2 = Pipe()💡 最佳實踐
1. 使用毒丸(Poison Pill)模式
from multiprocessing import Process, Queue
def worker(queue):
while True:
item = queue.get()
if item is None: # 毒丸:停止信號
print("[Worker] 收到停止信號")
break
# 處理任務
print(f"[Worker] 處理: {item}")
queue = Queue()
w = Process(target=worker, args=(queue,))
w.start()
# 添加任務
for i in range(5):
queue.put(f"Task-{i}")
# 發送停止信號
queue.put(None) # 毒丸
w.join()2. 適當設置 Queue 大小
from multiprocessing import Queue
# ❌ 不好:無限大小(可能耗盡記憶體)
queue = Queue()
# ✅ 好:設置合理的大小
queue = Queue(maxsize=100)
# 💡 建議:根據記憶體和任務大小決定3. 使用 timeout 避免永久阻塞
from multiprocessing import Queue
from queue import Empty
queue = Queue()
try:
item = queue.get(timeout=5) # 最多等 5 秒
except Empty:
print("超時,佇列為空")✅ 重點回顧
Queue 的特性:
- 多對多通訊
- FIFO(先進先出)
- 線程安全、Process 安全
- 支援大小限制
- 阻塞式操作
Queue 的方法:
put(item)- 放入資料(阻塞)get()- 取出資料(阻塞)put_nowait(item)- 非阻塞放入get_nowait()- 非阻塞取出qsize()- 取得佇列大小empty()/full()- 檢查狀態
適用場景:
- ✅ 生產者-消費者模式
- ✅ 任務分配系統
- ✅ 工作佇列
- ✅ 資料處理管線
- ❌ 點對點快速通訊(使用 Pipe)
注意事項:
- 使用毒丸模式停止 Worker
- 設置合理的大小限制
- 使用 timeout 避免死鎖
- 只放入可序列化的對象
上一篇: 03-2. Pipe 詳解 下一篇: 03-4. Shared Memory 詳解
最後更新:2025-01-06