10-2. MQTT 協定詳解
深入理解 MQTT 的發布訂閱模式、QoS 機制與實戰應用
🚀 MQTT 協定詳解
MQTT 在網路模型中的位置
┌──────────────────────────────────────────────────────────┐
│ OSI 七層模型 TCP/IP 四層模型 │
├──────────────────────────────────────────────────────────┤
│ 7. 應用層 (Application) │
│ ├─ MQTT ───────────────┐ 應用層 (Application) │
│ │ (MQTT, HTTP, STOMP...) │
├─────────────────────────────┤ │
│ 6. 表現層 (Presentation) │ │
├─────────────────────────────┤ │
│ 5. 會話層 (Session) │ │
├─────────────────────────────┼─────────────────────────────┤
│ 4. 傳輸層 (Transport) │ 傳輸層 (Transport) │
│ └─ TCP ─────────────────┘ (TCP) │
├─────────────────────────────┼─────────────────────────────┤
│ 3. 網路層 (Network) │ 網際網路層 (Internet) │
│ └─ IP │ (IP, ICMP, ARP) │
├─────────────────────────────┼─────────────────────────────┤
│ 2. 資料連結層 (Data Link) │ 網路存取層 │
│ 1. 實體層 (Physical) │ (Network Access) │
└─────────────────────────────┴─────────────────────────────┘
📍 位置:OSI Layer 7(應用層)/ TCP/IP Layer 4(應用層)
🔌 Port:1883(未加密)/ 8883(MQTT over SSL/TLS)
🚛 傳輸協定:TCP為什麼 MQTT 用 TCP?
| 原因 | 說明 |
|---|---|
| 可靠傳輸 🎯 | IoT 裝置的指令不能遺失(開燈指令不能漏掉) |
| 連線保持 🔗 | MQTT 使用長連接(Persistent Connection),減少重複握手 |
| QoS 機制 ✅ | 需要 TCP 的順序保證和確認機制來實現 QoS 1/2 |
| 低頻寬友善 💾 | TCP 雖有額外開銷,但 MQTT Header 只有 2 bytes(超輕量) |
💡 特殊情況:MQTT-SN (MQTT for Sensor Networks) 可用 UDP,專為更低功耗場景設計
🎯 什麼是 MQTT?
💡 比喻:訂閱制雜誌服務
你訂閱了「科技雜誌」,出版社(Broker)會自動把新刊物送到你家
你不需要每天打電話問:「有新的雜誌嗎?」
其他訂閱同樣雜誌的人,也會同時收到
Publisher(出版社)→ Broker(郵局)→ Subscriber(訂閱者)MQTT(Message Queuing Telemetry Transport) 是一種輕量級的發布/訂閱式訊息傳輸協定,專為低頻寬、高延遲、不可靠網路環境設計。
為什麼需要 MQTT?
問題場景: 假設你有 1000 個溫度感測器,每分鐘回報一次數據…
❌ HTTP 輪詢(Polling):
# 每個感測器每分鐘發送 HTTP 請求
while True:
response = requests.post('http://server.com/temperature', data={'temp': 25.5})
time.sleep(60)
# 問題:
# 1. 每次都要建立 TCP 連線(三次握手)
# 2. HTTP Header 很大(幾百 bytes)
# 3. 浪費頻寬和電力
# 4. 伺服器需要同時處理 1000 個連線✅ MQTT:
# 感測器連線一次,之後只傳輸數據
client.connect("mqtt.server.com", 1883)
while True:
client.publish("sensor/temp", "25.5") # 只傳輸 7 bytes!
time.sleep(60)
# 優點:
# 1. 保持長連線(TCP keep-alive)
# 2. 極小的封包(header 只需 2 bytes)
# 3. 省電省流量(適合電池供電裝置)
# 4. Broker 統一管理🏗️ MQTT 架構
核心組件
┌─────────────┐
│ Publisher │ ──publish──> ┌────────┐ <──subscribe── ┌────────────┐
│ (發布者) │ │ Broker │ │ Subscriber │
└─────────────┘ │(中介者)│ │ (訂閱者) │
└────────┘ └────────────┘
↓
儲存 & 轉發訊息1️⃣ Broker(訊息中介者)
💡 比喻:郵局總部
所有的信都先送到郵局,郵局再根據地址分發給訂戶功能:
- 接收發布者的訊息
- 管理訂閱者的訂閱清單
- 轉發訊息給訂閱者
- 儲存離線訊息(Retained Messages)
- 處理客戶端斷線(Last Will)
常見 Broker 軟體:
- Mosquitto:開源、輕量、易於安裝
- EMQX:支援高併發(百萬級連線)
- HiveMQ:商業版,提供叢集和管理介面
- AWS IoT Core:雲端託管服務
安裝 Mosquitto:
# Ubuntu/Debian
sudo apt-get install mosquitto mosquitto-clients
# macOS
brew install mosquitto
# 啟動 Broker
mosquitto -v # -v 顯示詳細日誌2️⃣ Publisher(發布者)
import paho.mqtt.client as mqtt
# 建立客戶端
client = mqtt.Client(client_id="temperature_sensor_001")
# 連線到 Broker
client.connect("mqtt.example.com", 1883, 60)
# 發布訊息
client.publish(
topic="home/living-room/temperature",
payload="25.5",
qos=1,
retain=True
)
client.disconnect()3️⃣ Subscriber(訂閱者)
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print(f"連線結果:{rc}")
# 訂閱主題
client.subscribe("home/+/temperature", qos=1) # + 是萬用字元
def on_message(client, userdata, msg):
print(f"收到訊息:{msg.topic} = {msg.payload.decode()}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("mqtt.example.com", 1883, 60)
client.loop_forever() # 持續監聽📂 Topic(主題)與 Wildcards(萬用字元)
Topic 命名規則
💡 比喻:檔案系統路徑
home/living-room/temperature 就像 /home/user/documents/file.txt
使用 / 來分層,方便組織和訂閱範例:
home/living-room/temperature
home/living-room/humidity
home/bedroom/temperature
home/bedroom/light/statusWildcards(萬用字元)
1. +(單層萬用字元)
# 訂閱所有房間的溫度
client.subscribe("home/+/temperature")
# 會收到:
# home/living-room/temperature ✅
# home/bedroom/temperature ✅
# home/kitchen/temperature ✅
# 不會收到:
# home/bedroom/light/status ❌(超過一層)2. #(多層萬用字元)
# 訂閱 home 下的所有訊息
client.subscribe("home/#")
# 會收到:
# home/living-room/temperature ✅
# home/bedroom/light/status ✅
# home/kitchen/fridge/door/open ✅(多層也可以)
# 注意:# 必須是最後一個字元
client.subscribe("home/#/temperature") # ❌ 錯誤!實際應用範例
# 智慧家居系統
# 訂閱所有感測器數據
client.subscribe("home/+/sensor/#")
# 訂閱所有告警
client.subscribe("home/+/alert")
# 訂閱特定房間的所有裝置
client.subscribe("home/living-room/#")🎚️ QoS(服務品質等級)
💡 比喻:不同的快遞服務
QoS 0:平信(寄了就算,可能遺失)
QoS 1:掛號信(保證送達,但可能重複收到)
QoS 2:保價快遞(保證送達且只有一份,最可靠但最慢)QoS 0:At most once(最多一次)
Publisher → Broker → Subscriber
發送 ──────────────> 收到(可能遺失)特性:
- ❌ 不確認、不重傳
- ⚡ 最快、最省頻寬
- 📉 可能遺失訊息
適用場景:
# 溫度監控(遺失一筆沒關係,下次會再傳)
client.publish("sensor/temp", "25.5", qos=0)
# 即時位置(GPS 座標每秒更新,舊資料無意義)
client.publish("gps/location", "25.033,121.565", qos=0)QoS 1:At least once(至少一次)
Publisher → Broker → Subscriber
↓ ↓
PUBLISH PUBACK(確認)
↓
重傳(如果沒收到 PUBACK)特性:
- ✅ 保證送達
- ⚠️ 可能重複收到
- 🔄 需要確認機制(PUBACK)
適用場景:
# 告警通知(不能遺失,重複可以去重)
client.publish("alert/fire", "Kitchen fire detected!", qos=1)
# 使用者操作(開燈、關門)
client.publish("home/light/living-room", "ON", qos=1)處理重複訊息:
received_messages = set() # 用於去重
def on_message(client, userdata, msg):
message_id = msg.mid # Message ID
if message_id in received_messages:
print("重複訊息,忽略")
return
received_messages.add(message_id)
print(f"處理訊息:{msg.payload.decode()}")QoS 2:Exactly once(恰好一次)
Publisher → Broker → Subscriber
PUBLISH ────────>
<──── PUBREC(收到)
PUBREL ────────>(釋放)
<──── PUBCOMP(完成)特性:
- ✅✅ 保證送達且不重複
- 🐢 最慢(四次握手)
- 💾 需要儲存狀態
適用場景:
# 金流交易(絕對不能重複扣款)
client.publish("payment/debit", json.dumps({
'user_id': 'user_001',
'amount': 1000
}), qos=2)
# 庫存扣減(不能重複扣庫存)
client.publish("inventory/reduce", json.dumps({
'product_id': 'P001',
'quantity': 1
}), qos=2)QoS 等級比較
| QoS | 訊息保證 | 速度 | 頻寬 | 適用場景 |
|---|---|---|---|---|
| 0 | 可能遺失 | 最快 ⚡ | 最省 | 感測器數據、GPS |
| 1 | 至少一次 | 中等 | 中等 | 告警、控制指令 |
| 2 | 恰好一次 | 最慢 🐢 | 最多 | 金流、庫存 |
記憶口訣:「零忘一重二恰好」
🔖 Retained Message(保留訊息)
💡 比喻:布告欄上的公告
新同學加入群組時,可以直接看到布告欄上的最新公告
不用等到下次有人發言功能: Broker 會儲存每個主題的最後一則 Retained 訊息,新訂閱者連線時會立即收到。
範例:
# 發布者:發布裝置狀態(保留訊息)
client.publish("device/status", "online", retain=True)
# 當裝置離線時
client.publish("device/status", "offline", retain=True)# 訂閱者:剛連線就能知道裝置狀態
def on_connect(client, userdata, flags, rc):
client.subscribe("device/status")
def on_message(client, userdata, msg):
print(f"裝置狀態:{msg.payload.decode()}") # 立即收到 "offline"應用場景:
- 裝置狀態(online/offline)
- 最新配置(溫度設定值)
- 儀表板初始值(當前溫度、濕度)
清除 Retained 訊息:
# 發布空訊息來清除
client.publish("device/status", None, retain=True)⚰️ Last Will and Testament(遺囑訊息)
💡 比喻:遺囑
如果我意外死亡(斷線),請幫我告訴家人(訂閱者)
「我不在了」這個消息功能: 當客戶端異常斷線時(沒有正常發送 DISCONNECT),Broker 會自動發送預先設定的遺囑訊息。
設定遺囑:
import paho.mqtt.client as mqtt
client = mqtt.Client()
# 設定遺囑訊息(連線前設定)
client.will_set(
topic="device/sensor_001/status",
payload="offline",
qos=1,
retain=True
)
client.connect("mqtt.example.com", 1883, 60)
# 正常運行...
client.publish("sensor/temp", "25.5")
# 如果異常斷線(網路中斷、程式崩潰)
# Broker 會自動發送:device/sensor_001/status = "offline"監控裝置狀態:
# 監控系統訂閱所有裝置狀態
def on_message(client, userdata, msg):
if msg.topic.endswith("/status"):
device_id = msg.topic.split('/')[1]
status = msg.payload.decode()
if status == "offline":
print(f"⚠️ 裝置 {device_id} 已斷線!")
# 發送告警通知
send_alert(f"Device {device_id} is offline")
client.subscribe("device/+/status")
client.on_message = on_message
client.loop_forever()完整生命週期範例:
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print("已連線")
# 連線成功後發送 online 狀態
client.publish("device/sensor_001/status", "online", retain=True)
client = mqtt.Client()
client.on_connect = on_connect
# 設定遺囑:如果異常斷線,自動發送 offline
client.will_set("device/sensor_001/status", "offline", qos=1, retain=True)
client.connect("mqtt.example.com", 1883, 60)
client.loop_start()
try:
while True:
# 發送感測器數據
client.publish("sensor/temp", "25.5")
time.sleep(60)
except KeyboardInterrupt:
# 正常退出時,手動發送 offline
client.publish("device/sensor_001/status", "offline", retain=True)
client.disconnect()🔐 MQTT 安全性
1️⃣ 傳輸加密(TLS/SSL)
import paho.mqtt.client as mqtt
import ssl
client = mqtt.Client()
# 設定 TLS/SSL
client.tls_set(
ca_certs="ca.crt", # CA 憑證
certfile="client.crt", # 客戶端憑證
keyfile="client.key", # 客戶端私鑰
tls_version=ssl.PROTOCOL_TLSv1_2
)
# 連線到加密端口(通常是 8883)
client.connect("mqtt.example.com", 8883, 60)2️⃣ 身份驗證
# 使用者名稱 + 密碼
client.username_pw_set("username", "password")
client.connect("mqtt.example.com", 1883, 60)Broker 端設定(Mosquitto):
# 建立密碼檔
mosquitto_passwd -c /etc/mosquitto/passwd username
# mosquitto.conf
allow_anonymous false
password_file /etc/mosquitto/passwd3️⃣ 主題存取控制(ACL)
# /etc/mosquitto/acl
# user1 只能發布溫度數據
user user1
topic write sensor/temperature
# user2 只能訂閱數據
user user2
topic read sensor/#🛠️ 完整實作範例
智慧家居系統
溫度感測器(Publisher):
import paho.mqtt.client as mqtt
import time
import random
class TemperatureSensor:
def __init__(self, sensor_id, room):
self.sensor_id = sensor_id
self.room = room
self.client = mqtt.Client(client_id=f"sensor_{sensor_id}")
# 設定遺囑
self.client.will_set(
f"home/{room}/sensor/status",
"offline",
qos=1,
retain=True
)
def on_connect(self, client, userdata, flags, rc):
print(f"感測器 {self.sensor_id} 已連線")
# 發送上線狀態
self.client.publish(
f"home/{self.room}/sensor/status",
"online",
retain=True
)
def start(self):
self.client.on_connect = self.on_connect
self.client.connect("mqtt.example.com", 1883, 60)
self.client.loop_start()
try:
while True:
# 模擬讀取溫度
temp = round(20 + random.uniform(-5, 5), 1)
# 發布溫度數據
self.client.publish(
f"home/{self.room}/temperature",
str(temp),
qos=1
)
print(f"[{self.room}] 溫度:{temp}°C")
# 檢查告警
if temp > 30:
self.client.publish(
f"home/{self.room}/alert",
f"高溫警告!{temp}°C",
qos=1
)
time.sleep(10)
except KeyboardInterrupt:
self.client.publish(
f"home/{self.room}/sensor/status",
"offline",
retain=True
)
self.client.disconnect()
# 啟動感測器
sensor = TemperatureSensor(sensor_id="001", room="living-room")
sensor.start()監控系統(Subscriber):
import paho.mqtt.client as mqtt
import json
from datetime import datetime
class HomeMonitor:
def __init__(self):
self.client = mqtt.Client(client_id="home_monitor")
self.temperature_data = {}
def on_connect(self, client, userdata, flags, rc):
print("監控系統已連線")
# 訂閱所有房間的溫度
self.client.subscribe("home/+/temperature", qos=1)
# 訂閱所有告警
self.client.subscribe("home/+/alert", qos=1)
# 訂閱感測器狀態
self.client.subscribe("home/+/sensor/status", qos=1)
def on_message(self, client, userdata, msg):
topic_parts = msg.topic.split('/')
room = topic_parts[1]
if msg.topic.endswith("/temperature"):
temp = float(msg.payload.decode())
self.temperature_data[room] = {
'value': temp,
'timestamp': datetime.now().isoformat()
}
print(f"📊 [{room}] 溫度:{temp}°C")
elif msg.topic.endswith("/alert"):
alert_msg = msg.payload.decode()
print(f"🚨 [{room}] 告警:{alert_msg}")
self.send_notification(room, alert_msg)
elif msg.topic.endswith("/status"):
status = msg.payload.decode()
if status == "offline":
print(f"⚠️ [{room}] 感測器離線!")
self.send_notification(room, "感測器離線")
else:
print(f"✅ [{room}] 感測器上線")
def send_notification(self, room, message):
# 發送通知(Email、簡訊、LINE 等)
print(f"📧 發送通知:{room} - {message}")
def get_summary(self):
print("\n=== 溫度摘要 ===")
for room, data in self.temperature_data.items():
print(f"{room}: {data['value']}°C ({data['timestamp']})")
def start(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect("mqtt.example.com", 1883, 60)
self.client.loop_forever()
# 啟動監控
monitor = HomeMonitor()
monitor.start()🎓 常見面試題
Q1:MQTT 為什麼比 HTTP 更適合 IoT?
答案:
| 特性 | HTTP | MQTT |
|---|---|---|
| 連線模式 | 短連線(每次請求建立) | 長連線(保持連線) |
| Header 大小 | 幾百 bytes | 2 bytes |
| 方向 | 單向(請求-回應) | 雙向(發布-訂閱) |
| 頻寬消耗 | 高 | 極低 |
| 電力消耗 | 高(頻繁連線) | 低(保持連線) |
| 即時性 | 需要輪詢 | 即時推送 |
實際數據比較:
發送 "25.5" 這個溫度數據:
HTTP POST:
TCP 握手:3 個封包
HTTP Header:~200 bytes
總共:~500 bytes
MQTT Publish:
已連線狀態
MQTT Header:2 bytes
總共:6 bytes(省 98.8% 流量!)記憶技巧:「MQTT = 省省省(省電、省流量、省資源)」
Q2:QoS 1 和 QoS 2 的主要差異?
答案:
QoS 1:
Publisher → Broker
PUBLISH ──>
<── PUBACK(確認)
問題:如果 PUBACK 遺失,Publisher 會重傳
→ Subscriber 可能收到兩次QoS 2:
Publisher → Broker
PUBLISH ──>
<── PUBREC(已收到)
PUBREL ──>(可以釋放了)
<── PUBCOMP(完成)
保證:四次握手確保 Broker 只轉發一次給 Subscriber選擇建議:
- QoS 1:可容忍重複(用 message ID 去重)→ 告警通知
- QoS 2:絕對不能重複 → 金流、庫存
程式碼範例:
# QoS 1:需要處理重複
received_ids = set()
def on_message_qos1(client, userdata, msg):
if msg.mid in received_ids:
return # 重複訊息,忽略
received_ids.add(msg.mid)
process_alert(msg.payload)
# QoS 2:不需要去重(協定保證)
def on_message_qos2(client, userdata, msg):
deduct_payment(msg.payload) # 直接處理,不會重複Q3:什麼是 Clean Session?
答案:
Clean Session 決定客戶端斷線後,Broker 是否保留訂閱資訊和未送達的訊息。
# Clean Session = True(預設)
client = mqtt.Client(clean_session=True)
# 斷線後:訂閱清除、離線訊息丟棄# Clean Session = False
client = mqtt.Client(client_id="unique_id", clean_session=False)
# 斷線後:保留訂閱、保留 QoS 1/2 訊息比喻:
Clean Session = True:旅館(退房後不保留物品)
Clean Session = False:自己的家(離開後東西還在)應用場景:
# 場景 1:即時數據(不需要離線訊息)
client = mqtt.Client(clean_session=True)
client.subscribe("sensor/temperature")
# 斷線期間的溫度數據不重要(過時了)
# 場景 2:重要通知(需要離線訊息)
client = mqtt.Client(client_id="notification_service", clean_session=False)
client.subscribe("alerts/#", qos=1)
# 斷線期間的告警訊息都要收到重連後的行為:
# Clean Session = False 的客戶端重連後
def on_connect(client, userdata, flags, rc):
if flags['session_present']:
print("恢復舊 session,有離線訊息")
# 不需要重新訂閱,Broker 會立即推送離線訊息
else:
print("新 session,需要重新訂閱")
client.subscribe("topic")Q4:如何實作 MQTT 心跳機制?
答案:
MQTT 內建 Keep Alive 機制,不需要手動實作心跳。
原理:
client.connect("mqtt.example.com", 1883, keepalive=60)
# keepalive=60:每 60 秒沒有訊息就發送 PINGREQ
客戶端 → Broker
(60 秒內沒有 PUBLISH)
PINGREQ ──>(我還活著嗎?)
<── PINGRESP(是的,連線正常)如果 Broker 沒有回應:
def on_disconnect(client, userdata, rc):
if rc != 0:
print("非預期斷線,嘗試重連...")
while True:
try:
client.reconnect()
break
except:
time.sleep(5)自訂心跳(應用層):
import threading
def heartbeat():
while True:
client.publish("device/heartbeat", json.dumps({
'device_id': 'sensor_001',
'timestamp': datetime.now().isoformat(),
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent
}))
time.sleep(30)
# 在背景執行緒中執行
threading.Thread(target=heartbeat, daemon=True).start()Q5:MQTT 如何處理大量訊息?
答案:
多種策略組合:
1. 訊息限流(Rate Limiting):
import time
class RateLimiter:
def __init__(self, max_rate):
self.max_rate = max_rate # 每秒最多幾則
self.last_sent = 0
def publish(self, client, topic, payload):
now = time.time()
if now - self.last_sent < 1 / self.max_rate:
time.sleep(1 / self.max_rate - (now - self.last_sent))
client.publish(topic, payload)
self.last_sent = time.time()
# 使用:每秒最多 10 則訊息
limiter = RateLimiter(max_rate=10)
limiter.publish(client, "topic", "data")2. 訊息批次(Batching):
import json
class MessageBatcher:
def __init__(self, batch_size=10, batch_interval=5):
self.batch = []
self.batch_size = batch_size
self.batch_interval = batch_interval
self.last_sent = time.time()
def add(self, client, topic, data):
self.batch.append(data)
# 達到批次大小或時間間隔,發送
if len(self.batch) >= self.batch_size or \
time.time() - self.last_sent >= self.batch_interval:
self.flush(client, topic)
def flush(self, client, topic):
if self.batch:
client.publish(topic, json.dumps(self.batch))
self.batch = []
self.last_sent = time.time()
# 使用
batcher = MessageBatcher()
for i in range(100):
batcher.add(client, "sensor/batch", {'temp': 25.5, 'id': i})3. 壓縮:
import zlib
import json
def publish_compressed(client, topic, data):
# 序列化並壓縮
json_data = json.dumps(data)
compressed = zlib.compress(json_data.encode())
client.publish(topic, compressed)
def on_message_decompress(client, userdata, msg):
# 解壓縮
decompressed = zlib.decompress(msg.payload)
data = json.loads(decompressed.decode())
print(data)4. Broker 擴展(叢集):
使用 EMQX 或 HiveMQ 叢集版
支援水平擴展,處理百萬級連線
Client A ──┐
Client B ──┼──> Load Balancer ──┬──> Broker 1
Client C ──┘ ├──> Broker 2
└──> Broker 3📝 總結
MQTT 是物聯網世界的「輕量級快遞」:
- 輕量:2 bytes header,省電省流量
- 可靠:QoS 0/1/2 三種等級,按需選擇
- 靈活:Pub/Sub 模式,訂閱者隨時加入
- 智慧:Retained Message 保留最新狀態,Last Will 處理異常
記憶口訣:
- 「輕(輕量)、可(可靠)、靈(靈活)、智(智慧)」
適用場景:
- 🏠 智慧家居(感測器、控制器)
- 🚗 車聯網(即時位置、車況)
- 🏭 工業 4.0(設備監控)
- 📱 行動 App(推送通知)
🔗 延伸閱讀
- 上一篇:05-1. 即時通訊協定概覽
- 下一篇:05-3. XMPP 協定
- 相關文章:04-1. WebSocket 基礎概念