目錄
03-5. Socket 通訊
⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
掌握 Socket 這種最靈活的 IPC 方式,學習如何使用 TCP/UDP 進行 Process 間通訊,甚至跨網路通訊。
🤔 什麼是 Socket?
Socket(套接字) = 網路通訊的端點
一句話解釋: Socket 是一種通訊介面,讓 Process 能夠透過網路協定(TCP/UDP)互相傳遞資料,可用於本機或跨機器通訊。
🏢 用通訊方式比喻
Pipe/Queue = 內線電話
同一棟大樓內的兩個辦公室
├─ 辦公室 A ──→ 內線 ──→ 辦公室 B
└─ 只能在同一棟樓使用Socket = 手機/網路電話
不同地點的辦公室
├─ 台北辦公室 ──→ 網路 ──→ 高雄辦公室
├─ 可以跨城市、跨國家
└─ 使用標準化協定(TCP/UDP)💻 Socket 基礎概念
TCP vs UDP
| 特性 | TCP | UDP |
|---|---|---|
| 連線方式 | 面向連線 | 無連線 |
| 可靠性 | 可靠(保證送達) | 不可靠(可能遺失) |
| 速度 | 較慢 | 較快 |
| 順序 | 保證順序 | 不保證順序 |
| 適用場景 | HTTP、檔案傳輸 | 影片串流、遊戲 |
Socket 通訊流程
TCP 流程
Server Client
│ │
│ 1. socket() │
│ 2. bind() │
│ 3. listen() │
│ │ 1. socket()
│ 4. accept() ←─────────────── │ 2. connect()
│ (阻塞等待) │
│ │
│ 5. recv() ←──── 資料 ────── │ 3. send()
│ 6. send() ───── 資料 ──────→ │ 4. recv()
│ │
│ 7. close() ←─────────────── │ 5. close()
│ │1️⃣ TCP Socket 基礎
簡單的 TCP Server
import socket
def tcp_server():
# 1. 創建 Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. 綁定地址和端口
server_socket.bind(('localhost', 8888))
# 3. 開始監聽(最多 5 個等待連線)
server_socket.listen(5)
print("伺服器啟動,等待連線...")
# 4. 接受連線(阻塞)
client_socket, client_address = server_socket.accept()
print(f"客戶端連線: {client_address}")
# 5. 接收資料
data = client_socket.recv(1024)
print(f"收到: {data.decode()}")
# 6. 發送回應
client_socket.send(b"Hello from server")
# 7. 關閉連線
client_socket.close()
server_socket.close()
if __name__ == '__main__':
tcp_server()簡單的 TCP Client
import socket
def tcp_client():
# 1. 創建 Socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. 連線到伺服器
client_socket.connect(('localhost', 8888))
print("已連線到伺服器")
# 3. 發送資料
client_socket.send(b"Hello from client")
# 4. 接收回應
data = client_socket.recv(1024)
print(f"收到: {data.decode()}")
# 5. 關閉連線
client_socket.close()
if __name__ == '__main__':
tcp_client()完整範例:Server + Client
from multiprocessing import Process
import socket
import time
def server():
"""TCP Server Process"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 9999))
server_socket.listen(5)
print("[Server] 啟動,等待連線...")
# 接受 3 個客戶端
for i in range(3):
client_socket, addr = server_socket.accept()
print(f"[Server] 客戶端 {i+1} 連線: {addr}")
data = client_socket.recv(1024)
print(f"[Server] 收到: {data.decode()}")
response = f"回應給客戶端 {i+1}"
client_socket.send(response.encode())
client_socket.close()
server_socket.close()
def client(client_id):
"""TCP Client Process"""
time.sleep(1) # 確保 Server 先啟動
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('localhost', 9999))
message = f"你好,我是客戶端 {client_id}"
client_socket.send(message.encode())
print(f"[Client {client_id}] 已發送: {message}")
data = client_socket.recv(1024)
print(f"[Client {client_id}] 收到: {data.decode()}")
client_socket.close()
if __name__ == '__main__':
# 啟動 Server
server_process = Process(target=server)
server_process.start()
# 啟動 3 個 Client
clients = [Process(target=client, args=(i,)) for i in range(3)]
for c in clients:
c.start()
# 等待完成
server_process.join()
for c in clients:
c.join()
print("所有通訊完成")2️⃣ UDP Socket
UDP Server
import socket
def udp_server():
# 創建 UDP Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('localhost', 7777))
print("[UDP Server] 啟動,等待資料...")
# 接收資料(無需 accept)
for i in range(3):
data, client_address = server_socket.recvfrom(1024)
print(f"[Server] 收到來自 {client_address}: {data.decode()}")
# 發送回應
response = f"收到你的第 {i+1} 條訊息"
server_socket.sendto(response.encode(), client_address)
server_socket.close()
if __name__ == '__main__':
udp_server()UDP Client
import socket
def udp_client(client_id):
# 創建 UDP Socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 發送資料(無需 connect)
message = f"UDP 訊息來自客戶端 {client_id}"
client_socket.sendto(message.encode(), ('localhost', 7777))
print(f"[Client {client_id}] 已發送: {message}")
# 接收回應
data, server_address = client_socket.recvfrom(1024)
print(f"[Client {client_id}] 收到: {data.decode()}")
client_socket.close()
if __name__ == '__main__':
from multiprocessing import Process
import time
# 啟動 Server
server_process = Process(target=udp_server)
server_process.start()
time.sleep(1)
# 啟動 Client
clients = [Process(target=udp_client, args=(i,)) for i in range(3)]
for c in clients:
c.start()
server_process.join()
for c in clients:
c.join()3️⃣ 多客戶端處理
方法 1:循序處理(單 Process)
import socket
def sequential_server():
"""循序處理每個客戶端"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8000))
server.listen(5)
print("[Server] 啟動(循序模式)")
try:
while True:
client, addr = server.accept()
print(f"[Server] 客戶端連線: {addr}")
# 處理客戶端(阻塞)
data = client.recv(1024)
print(f"[Server] 收到: {data.decode()}")
response = "已處理你的請求"
client.send(response.encode())
client.close()
except KeyboardInterrupt:
print("\n[Server] 關閉")
server.close()
if __name__ == '__main__':
sequential_server()問題: 同時只能處理一個客戶端,其他客戶端需要等待。
方法 2:多 Thread 處理
import socket
from threading import Thread
def handle_client(client_socket, client_address):
"""處理單個客戶端"""
print(f"[Thread] 處理客戶端: {client_address}")
data = client_socket.recv(1024)
print(f"[Thread] 收到: {data.decode()}")
import time
time.sleep(2) # 模擬處理
response = f"已處理來自 {client_address} 的請求"
client_socket.send(response.encode())
client_socket.close()
def threaded_server():
"""多 Thread 伺服器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8001))
server.listen(5)
print("[Server] 啟動(多 Thread 模式)")
try:
while True:
client, addr = server.accept()
print(f"[Server] 接受連線: {addr}")
# 為每個客戶端創建 Thread
thread = Thread(target=handle_client, args=(client, addr))
thread.start()
except KeyboardInterrupt:
print("\n[Server] 關閉")
server.close()
if __name__ == '__main__':
threaded_server()方法 3:多 Process 處理
import socket
from multiprocessing import Process
def handle_client_process(client_socket, client_address):
"""Process 處理客戶端"""
print(f"[Process] 處理客戶端: {client_address}")
data = client_socket.recv(1024)
print(f"[Process] 收到: {data.decode()}")
import time
time.sleep(2)
response = f"已處理來自 {client_address} 的請求"
client_socket.send(response.encode())
client_socket.close()
def process_server():
"""多 Process 伺服器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8002))
server.listen(5)
print("[Server] 啟動(多 Process 模式)")
try:
while True:
client, addr = server.accept()
print(f"[Server] 接受連線: {addr}")
# 為每個客戶端創建 Process
p = Process(target=handle_client_process, args=(client, addr))
p.start()
client.close() # 父 Process 關閉副本
except KeyboardInterrupt:
print("\n[Server] 關閉")
server.close()
if __name__ == '__main__':
process_server()4️⃣ 實戰案例
案例 1:簡單的 RPC(遠程過程調用)
import socket
import json
from multiprocessing import Process
import time
def rpc_server():
"""RPC Server"""
def add(a, b):
return a + b
def multiply(a, b):
return a * b
# 函式映射
functions = {
'add': add,
'multiply': multiply
}
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 5000))
server.listen(5)
print("[RPC Server] 啟動")
while True:
client, addr = server.accept()
data = client.recv(1024).decode()
try:
# 解析請求
request = json.loads(data)
func_name = request['function']
args = request['args']
# 執行函式
result = functions[func_name](*args)
# 返回結果
response = json.dumps({'result': result})
client.send(response.encode())
except Exception as e:
error_response = json.dumps({'error': str(e)})
client.send(error_response.encode())
client.close()
def rpc_client():
"""RPC Client"""
time.sleep(1) # 等待 Server 啟動
# 呼叫 add
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 5000))
request = json.dumps({'function': 'add', 'args': [5, 3]})
client.send(request.encode())
response = json.loads(client.recv(1024).decode())
print(f"[Client] add(5, 3) = {response['result']}")
client.close()
# 呼叫 multiply
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 5000))
request = json.dumps({'function': 'multiply', 'args': [4, 7]})
client.send(request.encode())
response = json.loads(client.recv(1024).decode())
print(f"[Client] multiply(4, 7) = {response['result']}")
client.close()
if __name__ == '__main__':
server_process = Process(target=rpc_server)
server_process.daemon = True
server_process.start()
rpc_client()案例 2:任務分發系統
import socket
import json
from multiprocessing import Process
import time
def task_server():
"""任務分發伺服器"""
tasks = [
{'id': 1, 'type': 'compute', 'data': 100},
{'id': 2, 'type': 'compute', 'data': 200},
{'id': 3, 'type': 'compute', 'data': 300},
]
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 6000))
server.listen(5)
print("[Task Server] 啟動,等待 Worker...")
# 分發任務給 Worker
for task in tasks:
client, addr = server.accept()
print(f"[Server] Worker 連線: {addr}")
# 發送任務
task_json = json.dumps(task)
client.send(task_json.encode())
print(f"[Server] 分發任務 {task['id']} 給 {addr}")
# 接收結果
result = client.recv(1024).decode()
result_data = json.loads(result)
print(f"[Server] 任務 {task['id']} 完成,結果: {result_data['result']}")
client.close()
server.close()
def worker(worker_id):
"""Worker Process"""
time.sleep(0.5) # 錯開連線
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 6000))
print(f"[Worker {worker_id}] 連線到伺服器")
# 接收任務
task_data = client.recv(1024).decode()
task = json.loads(task_data)
print(f"[Worker {worker_id}] 收到任務: {task}")
# 執行任務
time.sleep(2) # 模擬計算
result = task['data'] * 2
# 返回結果
response = json.dumps({'task_id': task['id'], 'result': result})
client.send(response.encode())
print(f"[Worker {worker_id}] 完成任務 {task['id']}")
client.close()
if __name__ == '__main__':
# 啟動伺服器
server_process = Process(target=task_server)
server_process.start()
# 啟動 Worker
workers = [Process(target=worker, args=(i,)) for i in range(3)]
for w in workers:
w.start()
server_process.join()
for w in workers:
w.join()案例 3:聊天室
import socket
import threading
clients = []
clients_lock = threading.Lock()
def broadcast(message, sender_socket):
"""廣播訊息給所有客戶端"""
with clients_lock:
for client in clients:
if client != sender_socket:
try:
client.send(message)
except:
clients.remove(client)
def handle_client(client_socket, client_address):
"""處理客戶端"""
print(f"[Server] 新用戶加入: {client_address}")
# 廣播加入訊息
join_msg = f"{client_address} 加入聊天室\n".encode()
broadcast(join_msg, client_socket)
try:
while True:
data = client_socket.recv(1024)
if not data:
break
message = f"{client_address}: {data.decode()}"
print(f"[Server] {message}")
# 廣播給其他人
broadcast(message.encode(), client_socket)
except:
pass
finally:
with clients_lock:
clients.remove(client_socket)
client_socket.close()
leave_msg = f"{client_address} 離開聊天室\n".encode()
broadcast(leave_msg, client_socket)
def chat_server():
"""聊天室伺服器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 7000))
server.listen(5)
print("[Chat Server] 啟動")
try:
while True:
client, addr = server.accept()
with clients_lock:
clients.append(client)
thread = threading.Thread(target=handle_client, args=(client, addr))
thread.start()
except KeyboardInterrupt:
print("\n[Server] 關閉")
server.close()
if __name__ == '__main__':
chat_server()5️⃣ Socket 選項與優化
常用 Socket 選項
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 1. 重用地址(避免 "Address already in use")
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 2. 設置超時
server.settimeout(10) # 10 秒超時
# 3. 接收緩衝區大小
server.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4096)
# 4. 發送緩衝區大小
server.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4096)
# 5. TCP_NODELAY(禁用 Nagle 算法)
server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)非阻塞 Socket
import socket
import select
def non_blocking_server():
"""非阻塞伺服器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8888))
server.listen(5)
server.setblocking(False) # 設為非阻塞
print("[Server] 啟動(非阻塞模式)")
sockets_list = [server]
try:
while True:
# 使用 select 監控多個 Socket
readable, _, _ = select.select(sockets_list, [], [], 1)
for sock in readable:
if sock == server:
# 新連線
client, addr = server.accept()
client.setblocking(False)
sockets_list.append(client)
print(f"[Server] 新連線: {addr}")
else:
# 接收資料
try:
data = sock.recv(1024)
if data:
print(f"[Server] 收到: {data.decode()}")
sock.send(b"OK")
else:
sockets_list.remove(sock)
sock.close()
except:
sockets_list.remove(sock)
sock.close()
except KeyboardInterrupt:
print("\n[Server] 關閉")
server.close()
if __name__ == '__main__':
non_blocking_server()✅ 重點回顧
Socket 優勢:
- ✅ 可跨網路通訊(最靈活的 IPC)
- ✅ 標準化協定(TCP/UDP)
- ✅ 支援多種語言、平台
- ✅ 可擴展到分散式系統
TCP vs UDP:
- TCP:可靠、有序、面向連線(HTTP、檔案傳輸)
- UDP:快速、無連線、不保證送達(串流、遊戲)
多客戶端處理:
- 循序處理 - 簡單,但慢
- 多 Thread - 適合 I/O 密集
- 多 Process - 穩定、隔離
- 非阻塞 + select - 高效能
與其他 IPC 對比:
- Pipe/Queue - 本機限定,簡單
- Shared Memory - 最快,但複雜
- Socket - 跨網路,最靈活
適用場景:
- ✅ 分散式系統(微服務)
- ✅ Client-Server 架構
- ✅ 跨機器通訊
- ✅ 需要標準化協定
- ❌ 本機簡單通訊(用 Pipe/Queue 更簡單)
關鍵技巧:
- ✅ 使用
SO_REUSEADDR重用地址 - ✅ 設置合理的超時
- ✅ 處理連線異常
- ✅ 考慮使用非阻塞模式
上一篇: 03-4. Shared Memory(共享記憶體)
最後更新:2025-01-06