03-5. Socket 通訊

⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

掌握 Socket 這種最靈活的 IPC 方式,學習如何使用 TCP/UDP 進行 Process 間通訊,甚至跨網路通訊。


🤔 什麼是 Socket?

Socket(套接字) = 網路通訊的端點

一句話解釋: Socket 是一種通訊介面,讓 Process 能夠透過網路協定(TCP/UDP)互相傳遞資料,可用於本機或跨機器通訊。


🏢 用通訊方式比喻

Pipe/Queue = 內線電話

同一棟大樓內的兩個辦公室
├─ 辦公室 A ──→ 內線 ──→ 辦公室 B
└─ 只能在同一棟樓使用

Socket = 手機/網路電話

不同地點的辦公室
├─ 台北辦公室 ──→ 網路 ──→ 高雄辦公室
├─ 可以跨城市、跨國家
└─ 使用標準化協定(TCP/UDP)

💻 Socket 基礎概念

TCP vs UDP

特性TCPUDP
連線方式面向連線無連線
可靠性可靠(保證送達)不可靠(可能遺失)
速度較慢較快
順序保證順序不保證順序
適用場景HTTP、檔案傳輸影片串流、遊戲

Socket 通訊流程

TCP 流程

Server                          Client
  │                               │
  │ 1. socket()                   │
  │ 2. bind()                     │
  │ 3. listen()                   │
  │                               │ 1. socket()
  │ 4. accept() ←─────────────── │ 2. connect()
  │      (阻塞等待)               │
  │                               │
  │ 5. recv() ←──── 資料 ────── │ 3. send()
  │ 6. send() ───── 資料 ──────→ │ 4. recv()
  │                               │
  │ 7. close() ←─────────────── │ 5. close()
  │                               │

1️⃣ TCP Socket 基礎

簡單的 TCP Server

import socket

def tcp_server():
    # 1. 創建 Socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 2. 綁定地址和端口
    server_socket.bind(('localhost', 8888))

    # 3. 開始監聽(最多 5 個等待連線)
    server_socket.listen(5)
    print("伺服器啟動,等待連線...")

    # 4. 接受連線(阻塞)
    client_socket, client_address = server_socket.accept()
    print(f"客戶端連線: {client_address}")

    # 5. 接收資料
    data = client_socket.recv(1024)
    print(f"收到: {data.decode()}")

    # 6. 發送回應
    client_socket.send(b"Hello from server")

    # 7. 關閉連線
    client_socket.close()
    server_socket.close()

if __name__ == '__main__':
    tcp_server()

簡單的 TCP Client

import socket

def tcp_client():
    # 1. 創建 Socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 2. 連線到伺服器
    client_socket.connect(('localhost', 8888))
    print("已連線到伺服器")

    # 3. 發送資料
    client_socket.send(b"Hello from client")

    # 4. 接收回應
    data = client_socket.recv(1024)
    print(f"收到: {data.decode()}")

    # 5. 關閉連線
    client_socket.close()

if __name__ == '__main__':
    tcp_client()

完整範例:Server + Client

from multiprocessing import Process
import socket
import time

def server():
    """TCP Server Process"""
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(('localhost', 9999))
    server_socket.listen(5)
    print("[Server] 啟動,等待連線...")

    # 接受 3 個客戶端
    for i in range(3):
        client_socket, addr = server_socket.accept()
        print(f"[Server] 客戶端 {i+1} 連線: {addr}")

        data = client_socket.recv(1024)
        print(f"[Server] 收到: {data.decode()}")

        response = f"回應給客戶端 {i+1}"
        client_socket.send(response.encode())
        client_socket.close()

    server_socket.close()

def client(client_id):
    """TCP Client Process"""
    time.sleep(1)  # 確保 Server 先啟動

    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect(('localhost', 9999))

    message = f"你好,我是客戶端 {client_id}"
    client_socket.send(message.encode())
    print(f"[Client {client_id}] 已發送: {message}")

    data = client_socket.recv(1024)
    print(f"[Client {client_id}] 收到: {data.decode()}")

    client_socket.close()

if __name__ == '__main__':
    # 啟動 Server
    server_process = Process(target=server)
    server_process.start()

    # 啟動 3 個 Client
    clients = [Process(target=client, args=(i,)) for i in range(3)]
    for c in clients:
        c.start()

    # 等待完成
    server_process.join()
    for c in clients:
        c.join()

    print("所有通訊完成")

2️⃣ UDP Socket

UDP Server

import socket

def udp_server():
    # 創建 UDP Socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.bind(('localhost', 7777))
    print("[UDP Server] 啟動,等待資料...")

    # 接收資料(無需 accept)
    for i in range(3):
        data, client_address = server_socket.recvfrom(1024)
        print(f"[Server] 收到來自 {client_address}: {data.decode()}")

        # 發送回應
        response = f"收到你的第 {i+1} 條訊息"
        server_socket.sendto(response.encode(), client_address)

    server_socket.close()

if __name__ == '__main__':
    udp_server()

UDP Client

import socket

def udp_client(client_id):
    # 創建 UDP Socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # 發送資料(無需 connect)
    message = f"UDP 訊息來自客戶端 {client_id}"
    client_socket.sendto(message.encode(), ('localhost', 7777))
    print(f"[Client {client_id}] 已發送: {message}")

    # 接收回應
    data, server_address = client_socket.recvfrom(1024)
    print(f"[Client {client_id}] 收到: {data.decode()}")

    client_socket.close()

if __name__ == '__main__':
    from multiprocessing import Process
    import time

    # 啟動 Server
    server_process = Process(target=udp_server)
    server_process.start()
    time.sleep(1)

    # 啟動 Client
    clients = [Process(target=udp_client, args=(i,)) for i in range(3)]
    for c in clients:
        c.start()

    server_process.join()
    for c in clients:
        c.join()

3️⃣ 多客戶端處理

方法 1:循序處理(單 Process)

import socket

def sequential_server():
    """循序處理每個客戶端"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 8000))
    server.listen(5)
    print("[Server] 啟動(循序模式)")

    try:
        while True:
            client, addr = server.accept()
            print(f"[Server] 客戶端連線: {addr}")

            # 處理客戶端(阻塞)
            data = client.recv(1024)
            print(f"[Server] 收到: {data.decode()}")

            response = "已處理你的請求"
            client.send(response.encode())
            client.close()

    except KeyboardInterrupt:
        print("\n[Server] 關閉")
        server.close()

if __name__ == '__main__':
    sequential_server()

問題: 同時只能處理一個客戶端,其他客戶端需要等待。


方法 2:多 Thread 處理

import socket
from threading import Thread

def handle_client(client_socket, client_address):
    """處理單個客戶端"""
    print(f"[Thread] 處理客戶端: {client_address}")

    data = client_socket.recv(1024)
    print(f"[Thread] 收到: {data.decode()}")

    import time
    time.sleep(2)  # 模擬處理

    response = f"已處理來自 {client_address} 的請求"
    client_socket.send(response.encode())
    client_socket.close()

def threaded_server():
    """多 Thread 伺服器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 8001))
    server.listen(5)
    print("[Server] 啟動(多 Thread 模式)")

    try:
        while True:
            client, addr = server.accept()
            print(f"[Server] 接受連線: {addr}")

            # 為每個客戶端創建 Thread
            thread = Thread(target=handle_client, args=(client, addr))
            thread.start()

    except KeyboardInterrupt:
        print("\n[Server] 關閉")
        server.close()

if __name__ == '__main__':
    threaded_server()

方法 3:多 Process 處理

import socket
from multiprocessing import Process

def handle_client_process(client_socket, client_address):
    """Process 處理客戶端"""
    print(f"[Process] 處理客戶端: {client_address}")

    data = client_socket.recv(1024)
    print(f"[Process] 收到: {data.decode()}")

    import time
    time.sleep(2)

    response = f"已處理來自 {client_address} 的請求"
    client_socket.send(response.encode())
    client_socket.close()

def process_server():
    """多 Process 伺服器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 8002))
    server.listen(5)
    print("[Server] 啟動(多 Process 模式)")

    try:
        while True:
            client, addr = server.accept()
            print(f"[Server] 接受連線: {addr}")

            # 為每個客戶端創建 Process
            p = Process(target=handle_client_process, args=(client, addr))
            p.start()
            client.close()  # 父 Process 關閉副本

    except KeyboardInterrupt:
        print("\n[Server] 關閉")
        server.close()

if __name__ == '__main__':
    process_server()

4️⃣ 實戰案例

案例 1:簡單的 RPC(遠程過程調用)

import socket
import json
from multiprocessing import Process
import time

def rpc_server():
    """RPC Server"""
    def add(a, b):
        return a + b

    def multiply(a, b):
        return a * b

    # 函式映射
    functions = {
        'add': add,
        'multiply': multiply
    }

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 5000))
    server.listen(5)
    print("[RPC Server] 啟動")

    while True:
        client, addr = server.accept()
        data = client.recv(1024).decode()

        try:
            # 解析請求
            request = json.loads(data)
            func_name = request['function']
            args = request['args']

            # 執行函式
            result = functions[func_name](*args)

            # 返回結果
            response = json.dumps({'result': result})
            client.send(response.encode())

        except Exception as e:
            error_response = json.dumps({'error': str(e)})
            client.send(error_response.encode())

        client.close()

def rpc_client():
    """RPC Client"""
    time.sleep(1)  # 等待 Server 啟動

    # 呼叫 add
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 5000))

    request = json.dumps({'function': 'add', 'args': [5, 3]})
    client.send(request.encode())

    response = json.loads(client.recv(1024).decode())
    print(f"[Client] add(5, 3) = {response['result']}")
    client.close()

    # 呼叫 multiply
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 5000))

    request = json.dumps({'function': 'multiply', 'args': [4, 7]})
    client.send(request.encode())

    response = json.loads(client.recv(1024).decode())
    print(f"[Client] multiply(4, 7) = {response['result']}")
    client.close()

if __name__ == '__main__':
    server_process = Process(target=rpc_server)
    server_process.daemon = True
    server_process.start()

    rpc_client()

案例 2:任務分發系統

import socket
import json
from multiprocessing import Process
import time

def task_server():
    """任務分發伺服器"""
    tasks = [
        {'id': 1, 'type': 'compute', 'data': 100},
        {'id': 2, 'type': 'compute', 'data': 200},
        {'id': 3, 'type': 'compute', 'data': 300},
    ]

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 6000))
    server.listen(5)
    print("[Task Server] 啟動,等待 Worker...")

    # 分發任務給 Worker
    for task in tasks:
        client, addr = server.accept()
        print(f"[Server] Worker 連線: {addr}")

        # 發送任務
        task_json = json.dumps(task)
        client.send(task_json.encode())
        print(f"[Server] 分發任務 {task['id']}{addr}")

        # 接收結果
        result = client.recv(1024).decode()
        result_data = json.loads(result)
        print(f"[Server] 任務 {task['id']} 完成,結果: {result_data['result']}")

        client.close()

    server.close()

def worker(worker_id):
    """Worker Process"""
    time.sleep(0.5)  # 錯開連線

    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 6000))
    print(f"[Worker {worker_id}] 連線到伺服器")

    # 接收任務
    task_data = client.recv(1024).decode()
    task = json.loads(task_data)
    print(f"[Worker {worker_id}] 收到任務: {task}")

    # 執行任務
    time.sleep(2)  # 模擬計算
    result = task['data'] * 2

    # 返回結果
    response = json.dumps({'task_id': task['id'], 'result': result})
    client.send(response.encode())
    print(f"[Worker {worker_id}] 完成任務 {task['id']}")

    client.close()

if __name__ == '__main__':
    # 啟動伺服器
    server_process = Process(target=task_server)
    server_process.start()

    # 啟動 Worker
    workers = [Process(target=worker, args=(i,)) for i in range(3)]
    for w in workers:
        w.start()

    server_process.join()
    for w in workers:
        w.join()

案例 3:聊天室

import socket
import threading

clients = []
clients_lock = threading.Lock()

def broadcast(message, sender_socket):
    """廣播訊息給所有客戶端"""
    with clients_lock:
        for client in clients:
            if client != sender_socket:
                try:
                    client.send(message)
                except:
                    clients.remove(client)

def handle_client(client_socket, client_address):
    """處理客戶端"""
    print(f"[Server] 新用戶加入: {client_address}")

    # 廣播加入訊息
    join_msg = f"{client_address} 加入聊天室\n".encode()
    broadcast(join_msg, client_socket)

    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                break

            message = f"{client_address}: {data.decode()}"
            print(f"[Server] {message}")

            # 廣播給其他人
            broadcast(message.encode(), client_socket)

    except:
        pass
    finally:
        with clients_lock:
            clients.remove(client_socket)
        client_socket.close()
        leave_msg = f"{client_address} 離開聊天室\n".encode()
        broadcast(leave_msg, client_socket)

def chat_server():
    """聊天室伺服器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 7000))
    server.listen(5)
    print("[Chat Server] 啟動")

    try:
        while True:
            client, addr = server.accept()
            with clients_lock:
                clients.append(client)

            thread = threading.Thread(target=handle_client, args=(client, addr))
            thread.start()

    except KeyboardInterrupt:
        print("\n[Server] 關閉")
        server.close()

if __name__ == '__main__':
    chat_server()

5️⃣ Socket 選項與優化

常用 Socket 選項

import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 1. 重用地址(避免 "Address already in use")
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 2. 設置超時
server.settimeout(10)  # 10 秒超時

# 3. 接收緩衝區大小
server.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4096)

# 4. 發送緩衝區大小
server.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4096)

# 5. TCP_NODELAY(禁用 Nagle 算法)
server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

非阻塞 Socket

import socket
import select

def non_blocking_server():
    """非阻塞伺服器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 8888))
    server.listen(5)
    server.setblocking(False)  # 設為非阻塞
    print("[Server] 啟動(非阻塞模式)")

    sockets_list = [server]

    try:
        while True:
            # 使用 select 監控多個 Socket
            readable, _, _ = select.select(sockets_list, [], [], 1)

            for sock in readable:
                if sock == server:
                    # 新連線
                    client, addr = server.accept()
                    client.setblocking(False)
                    sockets_list.append(client)
                    print(f"[Server] 新連線: {addr}")
                else:
                    # 接收資料
                    try:
                        data = sock.recv(1024)
                        if data:
                            print(f"[Server] 收到: {data.decode()}")
                            sock.send(b"OK")
                        else:
                            sockets_list.remove(sock)
                            sock.close()
                    except:
                        sockets_list.remove(sock)
                        sock.close()

    except KeyboardInterrupt:
        print("\n[Server] 關閉")
        server.close()

if __name__ == '__main__':
    non_blocking_server()

✅ 重點回顧

Socket 優勢:

  • ✅ 可跨網路通訊(最靈活的 IPC)
  • ✅ 標準化協定(TCP/UDP)
  • ✅ 支援多種語言、平台
  • ✅ 可擴展到分散式系統

TCP vs UDP:

  • TCP:可靠、有序、面向連線(HTTP、檔案傳輸)
  • UDP:快速、無連線、不保證送達(串流、遊戲)

多客戶端處理:

  1. 循序處理 - 簡單,但慢
  2. 多 Thread - 適合 I/O 密集
  3. 多 Process - 穩定、隔離
  4. 非阻塞 + select - 高效能

與其他 IPC 對比:

  • Pipe/Queue - 本機限定,簡單
  • Shared Memory - 最快,但複雜
  • Socket - 跨網路,最靈活

適用場景:

  • ✅ 分散式系統(微服務)
  • ✅ Client-Server 架構
  • ✅ 跨機器通訊
  • ✅ 需要標準化協定
  • ❌ 本機簡單通訊(用 Pipe/Queue 更簡單)

關鍵技巧:

  • ✅ 使用 SO_REUSEADDR 重用地址
  • ✅ 設置合理的超時
  • ✅ 處理連線異常
  • ✅ 考慮使用非阻塞模式

上一篇: 03-4. Shared Memory(共享記憶體)


最後更新:2025-01-06

0%