03-3. Message Queue(消息佇列)詳解

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🤔 一句話解釋

Message Queue(消息佇列)是一種多對多的 IPC 機制,允許多個生產者和消費者通過 FIFO 佇列交換訊息。


📨 用郵件系統來比喻

生產者(發件人)          消息佇列(郵局)         消費者(收件人)
     │                       ┌──────┐                 │
     ├──── 訊息 1 ──────→  │訊息 1 │ ────→ 消費者 A
     │                       ├──────┤                 │
     ├──── 訊息 2 ──────→  │訊息 2 │ ────→ 消費者 B
     │                       ├──────┤                 │
     └──── 訊息 3 ──────→  │訊息 3 │ ────→ 消費者 C
                             └──────┘

特點:
- 訊息排隊等待處理
- 先進先出(FIFO)
- 支援多個發送者和接收者
- 非同步通訊

🎯 Queue 的基本用法

1. 簡單的生產者-消費者

from multiprocessing import Process, Queue
import time

def producer(queue):
    """生產者:產生資料"""
    for i in range(5):
        item = f"Item-{i}"
        queue.put(item)
        print(f"[生產者] 生產: {item}")
        time.sleep(0.5)

    print("[生產者] 完成生產")

def consumer(queue):
    """消費者:消費資料"""
    while True:
        try:
            item = queue.get(timeout=2)  # 最多等 2 秒
            print(f"[消費者] 消費: {item}")
            time.sleep(1)
        except:
            print("[消費者] 佇列為空,停止消費")
            break

# 創建 Queue
queue = Queue()

# 創建 Process
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))

p1.start()
p2.start()
p1.join()
p2.join()

輸出:

[生產者] 生產: Item-0
[消費者] 消費: Item-0
[生產者] 生產: Item-1
[生產者] 生產: Item-2
[消費者] 消費: Item-1
[生產者] 生產: Item-3
[生產者] 生產: Item-4
[生產者] 完成生產
[消費者] 消費: Item-2
[消費者] 消費: Item-3
[消費者] 消費: Item-4
[消費者] 佇列為空,停止消費

2. 多個生產者,多個消費者

from multiprocessing import Process, Queue
import time
import random

def producer(queue, producer_id):
    """生產者"""
    for i in range(3):
        item = f"P{producer_id}-Item{i}"
        queue.put(item)
        print(f"[生產者 {producer_id}] 生產: {item}")
        time.sleep(random.uniform(0.1, 0.5))

def consumer(queue, consumer_id):
    """消費者"""
    while True:
        try:
            item = queue.get(timeout=2)
            print(f"[消費者 {consumer_id}] 消費: {item}")
            time.sleep(random.uniform(0.5, 1))
        except:
            print(f"[消費者 {consumer_id}] 停止")
            break

# 創建 Queue
queue = Queue()

# 創建 3 個生產者
producers = [
    Process(target=producer, args=(queue, i))
    for i in range(3)
]

# 創建 2 個消費者
consumers = [
    Process(target=consumer, args=(queue, i))
    for i in range(2)
]

# 啟動所有 Process
for p in producers + consumers:
    p.start()

# 等待所有 Process 完成
for p in producers + consumers:
    p.join()

輸出:

[生產者 0] 生產: P0-Item0
[生產者 1] 生產: P1-Item0
[生產者 2] 生產: P2-Item0
[消費者 0] 消費: P0-Item0
[消費者 1] 消費: P1-Item0
[生產者 0] 生產: P0-Item1
[生產者 1] 生產: P1-Item1
[消費者 0] 消費: P2-Item0
[生產者 2] 生產: P2-Item1
...

📊 Queue 的方法

1. put() - 放入資料

from multiprocessing import Queue

queue = Queue()

# 放入資料
queue.put("Hello")
queue.put(123)
queue.put({'key': 'value'})
queue.put([1, 2, 3])

print(f"佇列大小: {queue.qsize()}")
# 輸出:佇列大小: 4

2. get() - 取出資料

from multiprocessing import Queue

queue = Queue()
queue.put("First")
queue.put("Second")

# 取出資料(FIFO)
item1 = queue.get()  # "First"
item2 = queue.get()  # "Second"

print(item1, item2)
# 輸出:First Second

3. put_nowait() 和 get_nowait() - 非阻塞

from multiprocessing import Queue
from queue import Empty, Full

queue = Queue(maxsize=2)  # 限制大小為 2

# 非阻塞 put
try:
    queue.put_nowait("Item 1")  # ✅ 成功
    queue.put_nowait("Item 2")  # ✅ 成功
    queue.put_nowait("Item 3")  # ❌ 會拋出 Full 異常
except Full:
    print("佇列已滿")

# 非阻塞 get
try:
    item1 = queue.get_nowait()  # ✅ 成功
    item2 = queue.get_nowait()  # ✅ 成功
    item3 = queue.get_nowait()  # ❌ 會拋出 Empty 異常
except Empty:
    print("佇列為空")

4. empty() 和 full() - 檢查狀態

from multiprocessing import Queue

queue = Queue(maxsize=3)

print(f"是否為空: {queue.empty()}")  # True

queue.put("Item 1")
queue.put("Item 2")
queue.put("Item 3")

print(f"是否已滿: {queue.full()}")   # True
print(f"佇列大小: {queue.qsize()}")  # 3

🏭 實際案例 1:任務處理系統

from multiprocessing import Process, Queue
import time
import random

def task_generator(task_queue, num_tasks):
    """任務產生器"""
    for i in range(num_tasks):
        task = {
            'id': i,
            'type': random.choice(['CPU', 'I/O', 'Network']),
            'duration': random.uniform(0.5, 2.0)
        }
        task_queue.put(task)
        print(f"[產生器] 創建任務 {i}: {task['type']}")
        time.sleep(0.2)

    # 發送停止信號
    for _ in range(3):  # 3 個 Worker
        task_queue.put(None)

def worker(task_queue, worker_id, result_queue):
    """工作者"""
    processed = 0
    while True:
        task = task_queue.get()

        if task is None:  # 停止信號
            print(f"[Worker {worker_id}] 收到停止信號,處理了 {processed} 個任務")
            break

        print(f"[Worker {worker_id}] 處理任務 {task['id']} ({task['type']})")
        time.sleep(task['duration'])

        # 記錄結果
        result = {
            'task_id': task['id'],
            'worker_id': worker_id,
            'status': 'completed'
        }
        result_queue.put(result)
        processed += 1

def result_collector(result_queue, num_tasks):
    """結果收集器"""
    results = []
    for _ in range(num_tasks):
        result = result_queue.get()
        results.append(result)
        print(f"[收集器] 任務 {result['task_id']} 完成 (Worker {result['worker_id']})")

    print(f"\n[收集器] 總共完成 {len(results)} 個任務")

# 主程式
if __name__ == '__main__':
    task_queue = Queue()
    result_queue = Queue()
    num_tasks = 10

    # 創建 Process
    generator = Process(target=task_generator, args=(task_queue, num_tasks))
    workers = [
        Process(target=worker, args=(task_queue, i, result_queue))
        for i in range(3)
    ]
    collector = Process(target=result_collector, args=(result_queue, num_tasks))

    # 啟動
    generator.start()
    collector.start()
    for w in workers:
        w.start()

    # 等待完成
    generator.join()
    for w in workers:
        w.join()
    collector.join()

🌐 實際案例 2:網頁爬蟲系統

from multiprocessing import Process, Queue
import time
import random

def url_generator(url_queue):
    """URL 產生器"""
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
        'https://example.com/page4',
        'https://example.com/page5',
    ]

    for url in urls:
        url_queue.put(url)
        print(f"[產生器] 添加 URL: {url}")

    # 停止信號
    for _ in range(2):  # 2 個爬蟲
        url_queue.put(None)

def crawler(url_queue, crawler_id, data_queue):
    """爬蟲 Worker"""
    while True:
        url = url_queue.get()

        if url is None:
            print(f"[爬蟲 {crawler_id}] 完成")
            break

        print(f"[爬蟲 {crawler_id}] 爬取: {url}")

        # 模擬爬取
        time.sleep(random.uniform(1, 2))

        # 模擬提取資料
        data = {
            'url': url,
            'title': f'Page Title from {url}',
            'content': f'Content from {url}',
            'links': random.randint(5, 20)
        }

        data_queue.put(data)
        print(f"[爬蟲 {crawler_id}] 完成: {url}")

def data_processor(data_queue, num_pages):
    """資料處理器"""
    all_data = []

    for _ in range(num_pages):
        data = data_queue.get()
        all_data.append(data)
        print(f"[處理器] 處理資料: {data['title']} ({data['links']} 個連結)")

    print(f"\n[處理器] 總共處理 {len(all_data)} 個頁面")

# 主程式
if __name__ == '__main__':
    url_queue = Queue()
    data_queue = Queue()
    num_pages = 5

    # 創建 Process
    generator = Process(target=url_generator, args=(url_queue,))
    crawlers = [
        Process(target=crawler, args=(url_queue, i, data_queue))
        for i in range(2)
    ]
    processor = Process(target=data_processor, args=(data_queue, num_pages))

    # 啟動
    generator.start()
    processor.start()
    for c in crawlers:
        c.start()

    # 等待
    generator.join()
    for c in crawlers:
        c.join()
    processor.join()

📸 實際案例 3:圖片處理管線

from multiprocessing import Process, Queue
import time
import random

def image_loader(input_queue):
    """圖片載入器"""
    images = [f'image_{i}.jpg' for i in range(10)]

    for img in images:
        input_queue.put(img)
        print(f"[載入器] 載入: {img}")
        time.sleep(0.2)

    # 停止信號
    for _ in range(2):  # 2 個處理器
        input_queue.put(None)

def image_processor(input_queue, output_queue, processor_id):
    """圖片處理器"""
    while True:
        img = input_queue.get()

        if img is None:
            print(f"[處理器 {processor_id}] 完成")
            output_queue.put(None)  # 傳遞停止信號
            break

        print(f"[處理器 {processor_id}] 處理: {img}")

        # 模擬處理(調整大小、濾鏡等)
        time.sleep(random.uniform(0.5, 1.5))

        processed_img = {
            'original': img,
            'processed': f'processed_{img}',
            'size': (800, 600)
        }

        output_queue.put(processed_img)

def image_saver(output_queue, num_images):
    """圖片儲存器"""
    saved_count = 0
    stop_signals = 0

    while True:
        item = output_queue.get()

        if item is None:
            stop_signals += 1
            if stop_signals == 2:  # 兩個處理器都停止了
                break
            continue

        print(f"[儲存器] 儲存: {item['processed']}")
        saved_count += 1

    print(f"\n[儲存器] 總共儲存 {saved_count} 張圖片")

# 主程式
if __name__ == '__main__':
    input_queue = Queue()
    output_queue = Queue()
    num_images = 10

    # 創建 Process
    loader = Process(target=image_loader, args=(input_queue,))
    processors = [
        Process(target=image_processor, args=(input_queue, output_queue, i))
        for i in range(2)
    ]
    saver = Process(target=image_saver, args=(output_queue, num_images))

    # 啟動
    loader.start()
    saver.start()
    for p in processors:
        p.start()

    # 等待
    loader.join()
    for p in processors:
        p.join()
    saver.join()

⚖️ Queue 的大小限制

from multiprocessing import Queue, Process
from queue import Full
import time

def producer(queue):
    """生產者:快速產生資料"""
    for i in range(10):
        try:
            queue.put(f"Item-{i}", timeout=1)  # 最多等 1 秒
            print(f"[生產者] 放入: Item-{i}")
        except Full:
            print(f"[生產者] 佇列已滿,無法放入 Item-{i}")
        time.sleep(0.1)

def slow_consumer(queue):
    """消費者:緩慢消費"""
    time.sleep(2)  # 延遲開始
    while True:
        try:
            item = queue.get(timeout=1)
            print(f"[消費者] 取出: {item}")
            time.sleep(0.5)  # 慢速處理
        except:
            break

# 限制 Queue 大小為 3
queue = Queue(maxsize=3)

p1 = Process(target=producer, args=(queue,))
p2 = Process(target=slow_consumer, args=(queue,))

p1.start()
p2.start()
p1.join()
p2.join()

輸出:

[生產者] 放入: Item-0
[生產者] 放入: Item-1
[生產者] 放入: Item-2
[生產者] 佇列已滿,無法放入 Item-3  ← 佇列滿了
[生產者] 佇列已滿,無法放入 Item-4
(等待 2 秒...)
[消費者] 取出: Item-0
[生產者] 放入: Item-5  ← 有空間了
...

🚨 Queue 的注意事項

注意事項 1:JoinableQueue 與任務追蹤

from multiprocessing import Process, JoinableQueue
import time

def worker(queue):
    """Worker"""
    while True:
        item = queue.get()

        if item is None:
            queue.task_done()  # 標記完成
            break

        print(f"[Worker] 處理: {item}")
        time.sleep(1)
        queue.task_done()  # ✅ 標記任務完成

# 使用 JoinableQueue
queue = JoinableQueue()

# 啟動 Worker
w = Process(target=worker, args=(queue,))
w.start()

# 添加任務
for i in range(5):
    queue.put(f"Task-{i}")

# 等待所有任務完成
queue.join()  # ⏸️ 阻塞,直到所有任務都被 task_done()

print("所有任務完成")

# 停止 Worker
queue.put(None)
w.join()

注意事項 2:避免死鎖

from multiprocessing import Process, Queue

def problematic_producer(queue):
    # ❌ 危險:無限產生資料
    for i in range(10000):
        queue.put(f"Item-{i}")  # 可能阻塞

# 💀 如果消費者沒有及時取出資料,生產者可能死鎖

# ✅ 解決方案:設置超時或使用大容量 Queue
queue = Queue(maxsize=10000)

注意事項 3:序列化問題

from multiprocessing import Queue
import threading

# ❌ 不能放入不可序列化的對象
queue = Queue()

# 例如:Thread 對象不能序列化
t = threading.Thread(target=lambda: None)
# queue.put(t)  # 會失敗

# ✅ 只放入可序列化的對象
queue.put("String")  # ✅
queue.put(123)       # ✅
queue.put([1, 2, 3]) # ✅
queue.put({'key': 'value'})  # ✅

🔄 Queue vs Pipe

特性QueuePipe
通訊模式多對多點對點
線程安全
速度較慢更快
FIFO
大小限制可設置
使用場景任務分配簡單通訊
# Queue:適合任務分配、生產者-消費者
from multiprocessing import Queue
queue = Queue()

# Pipe:適合點對點快速通訊
from multiprocessing import Pipe
conn1, conn2 = Pipe()

💡 最佳實踐

1. 使用毒丸(Poison Pill)模式

from multiprocessing import Process, Queue

def worker(queue):
    while True:
        item = queue.get()

        if item is None:  # 毒丸:停止信號
            print("[Worker] 收到停止信號")
            break

        # 處理任務
        print(f"[Worker] 處理: {item}")

queue = Queue()

w = Process(target=worker, args=(queue,))
w.start()

# 添加任務
for i in range(5):
    queue.put(f"Task-{i}")

# 發送停止信號
queue.put(None)  # 毒丸

w.join()

2. 適當設置 Queue 大小

from multiprocessing import Queue

# ❌ 不好:無限大小(可能耗盡記憶體)
queue = Queue()

# ✅ 好:設置合理的大小
queue = Queue(maxsize=100)

# 💡 建議:根據記憶體和任務大小決定

3. 使用 timeout 避免永久阻塞

from multiprocessing import Queue
from queue import Empty

queue = Queue()

try:
    item = queue.get(timeout=5)  # 最多等 5 秒
except Empty:
    print("超時,佇列為空")

✅ 重點回顧

Queue 的特性:

  • 多對多通訊
  • FIFO(先進先出)
  • 線程安全、Process 安全
  • 支援大小限制
  • 阻塞式操作

Queue 的方法:

  • put(item) - 放入資料(阻塞)
  • get() - 取出資料(阻塞)
  • put_nowait(item) - 非阻塞放入
  • get_nowait() - 非阻塞取出
  • qsize() - 取得佇列大小
  • empty() / full() - 檢查狀態

適用場景:

  • ✅ 生產者-消費者模式
  • ✅ 任務分配系統
  • ✅ 工作佇列
  • ✅ 資料處理管線
  • ❌ 點對點快速通訊(使用 Pipe)

注意事項:

  • 使用毒丸模式停止 Worker
  • 設置合理的大小限制
  • 使用 timeout 避免死鎖
  • 只放入可序列化的對象

上一篇: 03-2. Pipe 詳解 下一篇: 03-4. Shared Memory 詳解


最後更新:2025-01-06

0%